1
0
mirror of https://gitlab.com/libvirt/libvirt-python.git synced 2025-07-17 00:59:36 +03:00

examples: Add/fix PEP 484 type annotation

Signed-off-by: Philipp Hahn <hahn@univention.de>
This commit is contained in:
Philipp Hahn
2020-04-27 11:10:13 +02:00
committed by Philipp Hahn
parent 5434ed53ff
commit 9cf539a2a8
11 changed files with 129 additions and 120 deletions

View File

@ -2,11 +2,12 @@
# consolecallback - provide a persistent console that survives guest reboots # consolecallback - provide a persistent console that survives guest reboots
import sys, os, logging, libvirt, tty, termios, atexit import sys, os, logging, libvirt, tty, termios, atexit
from typing import Optional # noqa F401
def reset_term(): def reset_term() -> None:
termios.tcsetattr(0, termios.TCSADRAIN, attrs) termios.tcsetattr(0, termios.TCSADRAIN, attrs)
def error_handler(unused, error): def error_handler(unused, error) -> None:
# The console stream errors on VM shutdown; we don't care # The console stream errors on VM shutdown; we don't care
if (error[0] == libvirt.VIR_ERR_RPC and if (error[0] == libvirt.VIR_ERR_RPC and
error[1] == libvirt.VIR_FROM_STREAMS): error[1] == libvirt.VIR_FROM_STREAMS):
@ -14,20 +15,20 @@ def error_handler(unused, error):
logging.warn(error) logging.warn(error)
class Console(object): class Console(object):
def __init__(self, uri, uuid): def __init__(self, uri: str, uuid: str) -> None:
self.uri = uri self.uri = uri
self.uuid = uuid self.uuid = uuid
self.connection = libvirt.open(uri) self.connection = libvirt.open(uri)
self.domain = self.connection.lookupByUUIDString(uuid) self.domain = self.connection.lookupByUUIDString(uuid)
self.state = self.domain.state(0) self.state = self.domain.state(0)
self.connection.domainEventRegister(lifecycle_callback, self) self.connection.domainEventRegister(lifecycle_callback, self)
self.stream = None self.stream = None # type: Optional[libvirt.virStream]
self.run_console = True self.run_console = True
self.stdin_watch = -1 self.stdin_watch = -1
logging.info("%s initial state %d, reason %d", logging.info("%s initial state %d, reason %d",
self.uuid, self.state[0], self.state[1]) self.uuid, self.state[0], self.state[1])
def check_console(console): def check_console(console: Console) -> bool:
if (console.state[0] == libvirt.VIR_DOMAIN_RUNNING or if (console.state[0] == libvirt.VIR_DOMAIN_RUNNING or
console.state[0] == libvirt.VIR_DOMAIN_PAUSED): console.state[0] == libvirt.VIR_DOMAIN_PAUSED):
if console.stream is None: if console.stream is None:
@ -41,7 +42,7 @@ def check_console(console):
return console.run_console return console.run_console
def stdin_callback(watch, fd, events, console): def stdin_callback(watch: int, fd: int, events: int, console: Console) -> None:
readbuf = os.read(fd, 1024) readbuf = os.read(fd, 1024)
if readbuf.startswith(b""): if readbuf.startswith(b""):
console.run_console = False console.run_console = False
@ -49,7 +50,7 @@ def stdin_callback(watch, fd, events, console):
if console.stream: if console.stream:
console.stream.send(readbuf) console.stream.send(readbuf)
def stream_callback(stream, events, console): def stream_callback(stream: libvirt.virStream, events: int, console: Console) -> None:
try: try:
assert console.stream assert console.stream
received_data = console.stream.recv(1024) received_data = console.stream.recv(1024)
@ -57,7 +58,7 @@ def stream_callback(stream, events, console):
return return
os.write(0, received_data) os.write(0, received_data)
def lifecycle_callback (connection, domain, event, detail, console): def lifecycle_callback(connection: libvirt.virConnect, domain: libvirt.virDomain, event: int, detail: int, console: Console) -> None:
console.state = console.domain.state(0) console.state = console.domain.state(0)
logging.info("%s transitioned to state %d, reason %d", logging.info("%s transitioned to state %d, reason %d",
console.uuid, console.state[0], console.state[1]) console.uuid, console.state[0], console.state[1])

View File

@ -5,7 +5,7 @@ import libvirt
import sys import sys
import time import time
def usage(): def usage() -> None:
print("Usage: %s [URI] NETWORK" % sys.argv[0]) print("Usage: %s [URI] NETWORK" % sys.argv[0])
print(" Print leases info for a given virtual network") print(" Print leases info for a given virtual network")
@ -39,7 +39,7 @@ if not leases:
print("Failed to get leases for %s" % net.name()) print("Failed to get leases for %s" % net.name())
sys.exit(0) sys.exit(0)
def toIPAddrType(addrType): def toIPAddrType(addrType: int) -> str:
if addrType == libvirt.VIR_IP_ADDR_TYPE_IPV4: if addrType == libvirt.VIR_IP_ADDR_TYPE_IPV4:
return "ipv4" return "ipv4"
elif addrType == libvirt.VIR_IP_ADDR_TYPE_IPV6: elif addrType == libvirt.VIR_IP_ADDR_TYPE_IPV6:

View File

@ -6,19 +6,20 @@ import sys
import os import os
import libxml2 import libxml2
import pdb import pdb
from typing import Any
def usage(): def usage() -> None:
print('Usage: %s DOMAIN' % sys.argv[0]) print('Usage: %s DOMAIN' % sys.argv[0])
print(' Print information about the domain DOMAIN') print(' Print information about the domain DOMAIN')
def print_section(title): def print_section(title: str) -> None:
print("\n%s" % title) print("\n%s" % title)
print("=" * 60) print("=" * 60)
def print_entry(key, value): def print_entry(key: str, value: Any) -> None:
print("%-10s %-10s" % (key, value)) print("%-10s %-10s" % (key, value))
def print_xml(key, ctx, path): def print_xml(key: str, ctx, path: str) -> str:
res = ctx.xpathEval(path) res = ctx.xpathEval(path)
if res is None or len(res) == 0: if res is None or len(res) == 0:
value="Unknown" value="Unknown"

View File

@ -10,7 +10,7 @@ IPTYPE = {
} }
def print_dom_ifaces(dom): def print_dom_ifaces(dom: libvirt.virDomain) -> None:
ifaces = dom.interfaceAddresses(libvirt.VIR_DOMAIN_INTERFACE_ADDRESSES_SRC_LEASE) ifaces = dom.interfaceAddresses(libvirt.VIR_DOMAIN_INTERFACE_ADDRESSES_SRC_LEASE)
if ifaces is None: if ifaces is None:
print("Failed to get domain interfaces") print("Failed to get domain interfaces")

View File

@ -7,7 +7,7 @@ import os
import libxml2 import libxml2
import pdb import pdb
def usage(): def usage() -> None:
print('Usage: %s DIR' % sys.argv[0]) print('Usage: %s DIR' % sys.argv[0])
print(' Restore all the domains contained in DIR') print(' Restore all the domains contained in DIR')
print(' It is assumed that all files in DIR are') print(' It is assumed that all files in DIR are')

View File

@ -7,7 +7,7 @@ import os
import libxml2 import libxml2
import pdb import pdb
def usage(): def usage() -> None:
print('Usage: %s DIR' % sys.argv[0]) print('Usage: %s DIR' % sys.argv[0])
print(' Save all currently running domU\'s into DIR') print(' Save all currently running domU\'s into DIR')
print(' DIR must exist and be writable by this process') print(' DIR must exist and be writable by this process')

View File

@ -6,11 +6,12 @@ import sys
import os import os
import libxml2 import libxml2
import pdb import pdb
from typing import Tuple
# Parse the XML description of domU from FNAME # Parse the XML description of domU from FNAME
# and return a tuple (name, xmldesc) where NAME # and return a tuple (name, xmldesc) where NAME
# is the name of the domain, and xmldesc is the contetn of FNAME # is the name of the domain, and xmldesc is the contetn of FNAME
def read_domain(fname): def read_domain(fname: str) -> Tuple[str, str]:
fp = open(fname, "r") fp = open(fname, "r")
xmldesc = fp.read() xmldesc = fp.read()
fp.close() fp.close()
@ -19,7 +20,7 @@ def read_domain(fname):
name = doc.xpathNewContext().xpathEval("/domain/name")[0].content name = doc.xpathNewContext().xpathEval("/domain/name")[0].content
return (name, xmldesc) return (name, xmldesc)
def usage(): def usage() -> None:
print('Usage: %s domain.xml' % sys.argv[0]) print('Usage: %s domain.xml' % sys.argv[0])
print(' Check that the domain described by DOMAIN.XML is running') print(' Check that the domain described by DOMAIN.XML is running')
print(' If the domain is not running, create it') print(' If the domain is not running, create it')

View File

@ -7,9 +7,10 @@ import sys
import os import os
import libxml2 import libxml2
import getpass import getpass
from typing import Any, List
def usage(): def usage() -> None:
print("Usage: %s HOSTNAME" % sys.argv[0]) print("Usage: %s HOSTNAME" % sys.argv[0])
print(" List active domains of HOSTNAME and print some info") print(" List active domains of HOSTNAME and print some info")
@ -28,7 +29,7 @@ def usage():
# #
# The user_data argument is the user data item of the auth argument (see below) # The user_data argument is the user data item of the auth argument (see below)
# passed to libvirt.openAuth(). # passed to libvirt.openAuth().
def request_credentials(credentials, user_data): def request_credentials(credentials: List[List], user_data: Any) -> int:
for credential in credentials: for credential in credentials:
if credential[0] == libvirt.VIR_CRED_AUTHNAME: if credential[0] == libvirt.VIR_CRED_AUTHNAME:
# prompt the user to input a authname. display the provided message # prompt the user to input a authname. display the provided message
@ -50,16 +51,16 @@ def request_credentials(credentials, user_data):
return 0 return 0
def print_section(title): def print_section(title: str) -> None:
print("\n%s" % title) print("\n%s" % title)
print("=" * 60) print("=" * 60)
def print_entry(key, value): def print_entry(key: str, value: Any) -> None:
print("%-10s %-10s" % (key, value)) print("%-10s %-10s" % (key, value))
def print_xml(key, ctx, path): def print_xml(key: str, ctx, path: str) -> str:
res = ctx.xpathEval(path) res = ctx.xpathEval(path)
if res is None or len(res) == 0: if res is None or len(res) == 0:

View File

@ -15,6 +15,10 @@ import select
import errno import errno
import time import time
import threading import threading
from typing import Any, Callable, Dict, List, Optional, TypeVar # noqa F401
_T = TypeVar("_T")
_EventCallback = Callable[[int, int, int, _T], None]
_TimerCallback = Callable[[int, _T], None]
# This example can use three different event loop impls. It defaults # This example can use three different event loop impls. It defaults
@ -34,7 +38,7 @@ event_impl = "poll"
do_debug = False do_debug = False
def debug(msg): def debug(msg: str) -> None:
if do_debug: if do_debug:
print(msg) print(msg)
@ -50,26 +54,26 @@ class virEventLoopPoll:
# This class contains the data we need to track for a # This class contains the data we need to track for a
# single file handle # single file handle
class virEventLoopPollHandle: class virEventLoopPollHandle:
def __init__(self, handle, fd, events, cb, opaque): def __init__(self, handle: int, fd: int, events: int, cb: _EventCallback, opaque: _T):
self.handle = handle self.handle = handle
self.fd = fd self.fd = fd
self.events = events self.events = events
self.cb = cb self.cb = cb
self.opaque = opaque self.opaque = opaque
def get_id(self): def get_id(self) -> int:
return self.handle return self.handle
def get_fd(self): def get_fd(self) -> int:
return self.fd return self.fd
def get_events(self): def get_events(self) -> int:
return self.events return self.events
def set_events(self, events): def set_events(self, events: int):
self.events = events self.events = events
def dispatch(self, events): def dispatch(self, events: int):
self.cb(self.handle, self.cb(self.handle,
self.fd, self.fd,
events, events,
@ -78,29 +82,29 @@ class virEventLoopPoll:
# This class contains the data we need to track for a # This class contains the data we need to track for a
# single periodic timer # single periodic timer
class virEventLoopPollTimer: class virEventLoopPollTimer:
def __init__(self, timer, interval, cb, opaque): def __init__(self, timer: int, interval: int, cb: _TimerCallback, opaque: _T):
self.timer = timer self.timer = timer
self.interval = interval self.interval = interval
self.cb = cb self.cb = cb
self.opaque = opaque self.opaque = opaque
self.lastfired = 0 self.lastfired = 0
def get_id(self): def get_id(self) -> int:
return self.timer return self.timer
def get_interval(self): def get_interval(self) -> int:
return self.interval return self.interval
def set_interval(self, interval): def set_interval(self, interval: int):
self.interval = interval self.interval = interval
def get_last_fired(self): def get_last_fired(self) -> int:
return self.lastfired return self.lastfired
def set_last_fired(self, now): def set_last_fired(self, now: int):
self.lastfired = now self.lastfired = now
def dispatch(self): def dispatch(self) -> None:
self.cb(self.timer, self.cb(self.timer,
self.opaque) self.opaque)
@ -111,8 +115,8 @@ class virEventLoopPoll:
self.runningPoll = False self.runningPoll = False
self.nextHandleID = 1 self.nextHandleID = 1
self.nextTimerID = 1 self.nextTimerID = 1
self.handles = [] self.handles = [] # type: List[virEventLoopPollHandle]
self.timers = [] self.timers = [] # type: List[virEventLoopPollTimer]
self.cleanup = [] self.cleanup = []
self.quit = False self.quit = False
@ -135,7 +139,7 @@ class virEventLoopPoll:
# Calculate when the next timeout is due to occur, returning # Calculate when the next timeout is due to occur, returning
# the absolute timestamp for the next timeout, or 0 if there is # the absolute timestamp for the next timeout, or 0 if there is
# no timeout due # no timeout due
def next_timeout(self): def next_timeout(self) -> int:
next = 0 next = 0
for t in self.timers: for t in self.timers:
last = t.get_last_fired() last = t.get_last_fired()
@ -148,14 +152,14 @@ class virEventLoopPoll:
return next return next
# Lookup a virEventLoopPollHandle object based on file descriptor # Lookup a virEventLoopPollHandle object based on file descriptor
def get_handle_by_fd(self, fd): def get_handle_by_fd(self, fd: int) -> Optional[virEventLoopPollHandle]:
for h in self.handles: for h in self.handles:
if h.get_fd() == fd: if h.get_fd() == fd:
return h return h
return None return None
# Lookup a virEventLoopPollHandle object based on its event loop ID # Lookup a virEventLoopPollHandle object based on its event loop ID
def get_handle_by_id(self, handleID): def get_handle_by_id(self, handleID: int) -> Optional[virEventLoopPollHandle]:
for h in self.handles: for h in self.handles:
if h.get_id() == handleID: if h.get_id() == handleID:
return h return h
@ -180,8 +184,8 @@ class virEventLoopPoll:
# back around the loop with a crazy 5ms sleep. So when checking # back around the loop with a crazy 5ms sleep. So when checking
# if timeouts are due, we allow a margin of 20ms, to avoid # if timeouts are due, we allow a margin of 20ms, to avoid
# these pointless repeated tiny sleeps. # these pointless repeated tiny sleeps.
def run_once(self): def run_once(self) -> None:
sleep = -1 sleep = -1 # type: float
self.runningPoll = True self.runningPoll = True
for opaque in self.cleanup: for opaque in self.cleanup:
@ -237,12 +241,12 @@ class virEventLoopPoll:
self.runningPoll = False self.runningPoll = False
# Actually run the event loop forever # Actually run the event loop forever
def run_loop(self): def run_loop(self) -> None:
self.quit = False self.quit = False
while not self.quit: while not self.quit:
self.run_once() self.run_once()
def interrupt(self): def interrupt(self) -> None:
if self.runningPoll and not self.pendingWakeup: if self.runningPoll and not self.pendingWakeup:
self.pendingWakeup = True self.pendingWakeup = True
os.write(self.pipetrick[1], 'c'.encode("UTF-8")) os.write(self.pipetrick[1], 'c'.encode("UTF-8"))
@ -251,7 +255,7 @@ class virEventLoopPoll:
# event constants), firing the callback cb() when an event occurs. # event constants), firing the callback cb() when an event occurs.
# Returns a unique integer identier for this handle, that should be # Returns a unique integer identier for this handle, that should be
# used to later update/remove it # used to later update/remove it
def add_handle(self, fd, events, cb, opaque): def add_handle(self, fd: int, events: int, cb: _EventCallback, opaque: _T) -> int:
handleID = self.nextHandleID + 1 handleID = self.nextHandleID + 1
self.nextHandleID = self.nextHandleID + 1 self.nextHandleID = self.nextHandleID + 1
@ -270,7 +274,7 @@ class virEventLoopPoll:
# then the timer is registered, but not enabled # then the timer is registered, but not enabled
# Returns a unique integer identier for this handle, that should be # Returns a unique integer identier for this handle, that should be
# used to later update/remove it # used to later update/remove it
def add_timer(self, interval, cb, opaque): def add_timer(self, interval: int, cb: _TimerCallback, opaque: _T) -> int:
timerID = self.nextTimerID + 1 timerID = self.nextTimerID + 1
self.nextTimerID = self.nextTimerID + 1 self.nextTimerID = self.nextTimerID + 1
@ -283,7 +287,7 @@ class virEventLoopPoll:
return timerID return timerID
# Change the set of events to be monitored on the file handle # Change the set of events to be monitored on the file handle
def update_handle(self, handleID, events): def update_handle(self, handleID: int, events: int) -> None:
h = self.get_handle_by_id(handleID) h = self.get_handle_by_id(handleID)
if h: if h:
h.set_events(events) h.set_events(events)
@ -294,7 +298,7 @@ class virEventLoopPoll:
debug("Update handle %d fd %d events %d" % (handleID, h.get_fd(), events)) debug("Update handle %d fd %d events %d" % (handleID, h.get_fd(), events))
# Change the periodic frequency of the timer # Change the periodic frequency of the timer
def update_timer(self, timerID, interval): def update_timer(self, timerID: int, interval: int) -> None:
for h in self.timers: for h in self.timers:
if h.get_id() == timerID: if h.get_id() == timerID:
h.set_interval(interval) h.set_interval(interval)
@ -304,7 +308,7 @@ class virEventLoopPoll:
break break
# Stop monitoring for events on the file handle # Stop monitoring for events on the file handle
def remove_handle(self, handleID): def remove_handle(self, handleID: int) -> None:
handles = [] handles = []
for h in self.handles: for h in self.handles:
if h.get_id() == handleID: if h.get_id() == handleID:
@ -317,7 +321,7 @@ class virEventLoopPoll:
self.interrupt() self.interrupt()
# Stop firing the periodic timer # Stop firing the periodic timer
def remove_timer(self, timerID): def remove_timer(self, timerID: int) -> None:
timers = [] timers = []
for h in self.timers: for h in self.timers:
if h.get_id() != timerID: if h.get_id() != timerID:
@ -329,7 +333,7 @@ class virEventLoopPoll:
self.interrupt() self.interrupt()
# Convert from libvirt event constants, to poll() events constants # Convert from libvirt event constants, to poll() events constants
def events_to_poll(self, events): def events_to_poll(self, events: int) -> int:
ret = 0 ret = 0
if events & libvirt.VIR_EVENT_HANDLE_READABLE: if events & libvirt.VIR_EVENT_HANDLE_READABLE:
ret |= select.POLLIN ret |= select.POLLIN
@ -342,7 +346,7 @@ class virEventLoopPoll:
return ret return ret
# Convert from poll() event constants, to libvirt events constants # Convert from poll() event constants, to libvirt events constants
def events_from_poll(self, events): def events_from_poll(self, events: int) -> int:
ret = 0 ret = 0
if events & select.POLLIN: if events & select.POLLIN:
ret |= libvirt.VIR_EVENT_HANDLE_READABLE ret |= libvirt.VIR_EVENT_HANDLE_READABLE
@ -378,33 +382,33 @@ eventLoopThread = None
# another event loop such as GLib's, or something like the python # another event loop such as GLib's, or something like the python
# Twisted event framework. # Twisted event framework.
def virEventAddHandleImpl(fd, events, cb, opaque): def virEventAddHandleImpl(fd: int, events: int, cb: _EventCallback, opaque: _T) -> int:
return eventLoop.add_handle(fd, events, cb, opaque) return eventLoop.add_handle(fd, events, cb, opaque)
def virEventUpdateHandleImpl(handleID, events): def virEventUpdateHandleImpl(handleID: int, events: int) -> None:
return eventLoop.update_handle(handleID, events) return eventLoop.update_handle(handleID, events)
def virEventRemoveHandleImpl(handleID): def virEventRemoveHandleImpl(handleID: int) -> None:
return eventLoop.remove_handle(handleID) return eventLoop.remove_handle(handleID)
def virEventAddTimerImpl(interval, cb, opaque): def virEventAddTimerImpl(interval: int, cb: _TimerCallback, opaque: _T) -> int:
return eventLoop.add_timer(interval, cb, opaque) return eventLoop.add_timer(interval, cb, opaque)
def virEventUpdateTimerImpl(timerID, interval): def virEventUpdateTimerImpl(timerID: int, interval: int) -> None:
return eventLoop.update_timer(timerID, interval) return eventLoop.update_timer(timerID, interval)
def virEventRemoveTimerImpl(timerID): def virEventRemoveTimerImpl(timerID: int) -> None:
return eventLoop.remove_timer(timerID) return eventLoop.remove_timer(timerID)
# This tells libvirt what event loop implementation it # This tells libvirt what event loop implementation it
# should use # should use
def virEventLoopPollRegister(): def virEventLoopPollRegister() -> None:
libvirt.virEventRegisterImpl(virEventAddHandleImpl, libvirt.virEventRegisterImpl(virEventAddHandleImpl,
virEventUpdateHandleImpl, virEventUpdateHandleImpl,
virEventRemoveHandleImpl, virEventRemoveHandleImpl,
@ -414,23 +418,23 @@ def virEventLoopPollRegister():
# Directly run the event loop in the current thread # Directly run the event loop in the current thread
def virEventLoopPollRun(): def virEventLoopPollRun() -> None:
eventLoop.run_loop() eventLoop.run_loop()
def virEventLoopAIORun(loop): def virEventLoopAIORun(loop) -> None:
import asyncio import asyncio
asyncio.set_event_loop(loop) asyncio.set_event_loop(loop)
loop.run_forever() loop.run_forever()
def virEventLoopNativeRun(): def virEventLoopNativeRun() -> None:
while True: while True:
libvirt.virEventRunDefaultImpl() libvirt.virEventRunDefaultImpl()
# Spawn a background thread to run the event loop # Spawn a background thread to run the event loop
def virEventLoopPollStart(): def virEventLoopPollStart() -> None:
global eventLoopThread global eventLoopThread
virEventLoopPollRegister() virEventLoopPollRegister()
eventLoopThread = threading.Thread(target=virEventLoopPollRun, name="libvirtEventLoop") eventLoopThread = threading.Thread(target=virEventLoopPollRun, name="libvirtEventLoop")
@ -438,7 +442,7 @@ def virEventLoopPollStart():
eventLoopThread.start() eventLoopThread.start()
def virEventLoopAIOStart(): def virEventLoopAIOStart() -> None:
global eventLoopThread global eventLoopThread
import libvirtaio import libvirtaio
import asyncio import asyncio
@ -449,7 +453,7 @@ def virEventLoopAIOStart():
eventLoopThread.start() eventLoopThread.start()
def virEventLoopNativeStart(): def virEventLoopNativeStart() -> None:
global eventLoopThread global eventLoopThread
libvirt.virEventRegisterDefaultImpl() libvirt.virEventRegisterDefaultImpl()
eventLoopThread = threading.Thread(target=virEventLoopNativeRun, name="libvirtEventLoop") eventLoopThread = threading.Thread(target=virEventLoopNativeRun, name="libvirtEventLoop")
@ -463,14 +467,14 @@ def virEventLoopNativeStart():
class Description(object): class Description(object):
__slots__ = ('desc', 'args') __slots__ = ('desc', 'args')
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs) -> None:
self.desc = kwargs.get('desc') self.desc = kwargs.get('desc')
self.args = args self.args = args
def __str__(self): # type: () -> str def __str__(self) -> str:
return self.desc return self.desc or ''
def __getitem__(self, item): # type: (int) -> str def __getitem__(self, item: int) -> 'Description':
try: try:
data = self.args[item] data = self.args[item]
except IndexError: except IndexError:
@ -507,127 +511,127 @@ DISK_EVENTS = Description("Change missing on start", "Drop missing on start")
TRAY_EVENTS = Description("Opened", "Closed") TRAY_EVENTS = Description("Opened", "Closed")
def myDomainEventCallback(conn, dom, event, detail, opaque): def myDomainEventCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, event: int, detail: int, opaque: _T) -> None:
print("myDomainEventCallback%s EVENT: Domain %s(%s) %s %s" % ( print("myDomainEventCallback%s EVENT: Domain %s(%s) %s %s" % (
opaque, dom.name(), dom.ID(), DOM_EVENTS[event], DOM_EVENTS[event][detail])) opaque, dom.name(), dom.ID(), DOM_EVENTS[event], DOM_EVENTS[event][detail]))
def myDomainEventRebootCallback(conn, dom, opaque): def myDomainEventRebootCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, opaque: _T) -> None:
print("myDomainEventRebootCallback: Domain %s(%s)" % ( print("myDomainEventRebootCallback: Domain %s(%s)" % (
dom.name(), dom.ID())) dom.name(), dom.ID()))
def myDomainEventRTCChangeCallback(conn, dom, utcoffset, opaque): def myDomainEventRTCChangeCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, utcoffset: int, opaque: _T) -> None:
print("myDomainEventRTCChangeCallback: Domain %s(%s) %d" % ( print("myDomainEventRTCChangeCallback: Domain %s(%s) %d" % (
dom.name(), dom.ID(), utcoffset)) dom.name(), dom.ID(), utcoffset))
def myDomainEventWatchdogCallback(conn, dom, action, opaque): def myDomainEventWatchdogCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, action: int, opaque: _T) -> None:
print("myDomainEventWatchdogCallback: Domain %s(%s) %s" % ( print("myDomainEventWatchdogCallback: Domain %s(%s) %s" % (
dom.name(), dom.ID(), WATCHDOG_ACTIONS[action])) dom.name(), dom.ID(), WATCHDOG_ACTIONS[action]))
def myDomainEventIOErrorCallback(conn, dom, srcpath, devalias, action, opaque): def myDomainEventIOErrorCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, srcpath: str, devalias: str, action: int, opaque: _T) -> None:
print("myDomainEventIOErrorCallback: Domain %s(%s) %s %s %s" % ( print("myDomainEventIOErrorCallback: Domain %s(%s) %s %s %s" % (
dom.name(), dom.ID(), srcpath, devalias, ERROR_EVENTS[action])) dom.name(), dom.ID(), srcpath, devalias, ERROR_EVENTS[action]))
def myDomainEventIOErrorReasonCallback(conn, dom, srcpath, devalias, action, reason, opaque): def myDomainEventIOErrorReasonCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, srcpath: str, devalias: str, action: int, reason: int, opaque: _T) -> None:
print("myDomainEventIOErrorReasonCallback: Domain %s(%s) %s %s %s %s" % ( print("myDomainEventIOErrorReasonCallback: Domain %s(%s) %s %s %s %s" % (
dom.name(), dom.ID(), srcpath, devalias, ERROR_EVENTS[action], reason)) dom.name(), dom.ID(), srcpath, devalias, ERROR_EVENTS[action], reason))
def myDomainEventGraphicsCallback(conn, dom, phase, localAddr, remoteAddr, authScheme, subject, opaque): def myDomainEventGraphicsCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, phase: int, localAddr: str, remoteAddr: str, authScheme: str, subject: str, opaque: _T) -> None:
print("myDomainEventGraphicsCallback: Domain %s(%s) %s %s" % ( print("myDomainEventGraphicsCallback: Domain %s(%s) %s %s" % (
dom.name(), dom.ID(), GRAPHICS_PHASES[phase], authScheme)) dom.name(), dom.ID(), GRAPHICS_PHASES[phase], authScheme))
def myDomainEventControlErrorCallback(conn, dom, opaque): def myDomainEventControlErrorCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, opaque: _T) -> None:
print("myDomainEventControlErrorCallback: Domain %s(%s)" % ( print("myDomainEventControlErrorCallback: Domain %s(%s)" % (
dom.name(), dom.ID())) dom.name(), dom.ID()))
def myDomainEventBlockJobCallback(conn, dom, disk, type, status, opaque): def myDomainEventBlockJobCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, disk, type: int, status: int, opaque: _T) -> None:
print("myDomainEventBlockJobCallback: Domain %s(%s) %s on disk %s %s" % ( print("myDomainEventBlockJobCallback: Domain %s(%s) %s on disk %s %s" % (
dom.name(), dom.ID(), BLOCK_JOB_TYPES[type], disk, BLOCK_JOB_STATUS[status])) dom.name(), dom.ID(), BLOCK_JOB_TYPES[type], disk, BLOCK_JOB_STATUS[status]))
def myDomainEventDiskChangeCallback(conn, dom, oldSrcPath, newSrcPath, devAlias, reason, opaque): def myDomainEventDiskChangeCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, oldSrcPath: str, newSrcPath: str, devAlias: str, reason: int, opaque: _T) -> None:
print("myDomainEventDiskChangeCallback: Domain %s(%s) disk change oldSrcPath: %s newSrcPath: %s devAlias: %s reason: %s" % ( print("myDomainEventDiskChangeCallback: Domain %s(%s) disk change oldSrcPath: %s newSrcPath: %s devAlias: %s reason: %s" % (
dom.name(), dom.ID(), oldSrcPath, newSrcPath, devAlias, DISK_EVENTS[reason])) dom.name(), dom.ID(), oldSrcPath, newSrcPath, devAlias, DISK_EVENTS[reason]))
def myDomainEventTrayChangeCallback(conn, dom, devAlias, reason, opaque): def myDomainEventTrayChangeCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, devAlias: str, reason: int, opaque: _T) -> None:
print("myDomainEventTrayChangeCallback: Domain %s(%s) tray change devAlias: %s reason: %s" % ( print("myDomainEventTrayChangeCallback: Domain %s(%s) tray change devAlias: %s reason: %s" % (
dom.name(), dom.ID(), devAlias, TRAY_EVENTS[reason])) dom.name(), dom.ID(), devAlias, TRAY_EVENTS[reason]))
def myDomainEventPMWakeupCallback(conn, dom, reason, opaque): def myDomainEventPMWakeupCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, reason: int, opaque: _T) -> None:
print("myDomainEventPMWakeupCallback: Domain %s(%s) system pmwakeup" % ( print("myDomainEventPMWakeupCallback: Domain %s(%s) system pmwakeup" % (
dom.name(), dom.ID())) dom.name(), dom.ID()))
def myDomainEventPMSuspendCallback(conn, dom, reason, opaque): def myDomainEventPMSuspendCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, reason: int, opaque: _T) -> None:
print("myDomainEventPMSuspendCallback: Domain %s(%s) system pmsuspend" % ( print("myDomainEventPMSuspendCallback: Domain %s(%s) system pmsuspend" % (
dom.name(), dom.ID())) dom.name(), dom.ID()))
def myDomainEventBalloonChangeCallback(conn, dom, actual, opaque): def myDomainEventBalloonChangeCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, actual: int, opaque: _T) -> None:
print("myDomainEventBalloonChangeCallback: Domain %s(%s) %d" % ( print("myDomainEventBalloonChangeCallback: Domain %s(%s) %d" % (
dom.name(), dom.ID(), actual)) dom.name(), dom.ID(), actual))
def myDomainEventPMSuspendDiskCallback(conn, dom, reason, opaque): def myDomainEventPMSuspendDiskCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, reason: int, opaque: _T) -> None:
print("myDomainEventPMSuspendDiskCallback: Domain %s(%s) system pmsuspend_disk" % ( print("myDomainEventPMSuspendDiskCallback: Domain %s(%s) system pmsuspend_disk" % (
dom.name(), dom.ID())) dom.name(), dom.ID()))
def myDomainEventDeviceRemovedCallback(conn, dom, dev, opaque): def myDomainEventDeviceRemovedCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, dev: str, opaque: _T) -> None:
print("myDomainEventDeviceRemovedCallback: Domain %s(%s) device removed: %s" % ( print("myDomainEventDeviceRemovedCallback: Domain %s(%s) device removed: %s" % (
dom.name(), dom.ID(), dev)) dom.name(), dom.ID(), dev))
def myDomainEventBlockJob2Callback(conn, dom, disk, type, status, opaque): def myDomainEventBlockJob2Callback(conn: libvirt.virConnect, dom: libvirt.virDomain, disk: str, type: int, status: int, opaque: _T) -> None:
print("myDomainEventBlockJob2Callback: Domain %s(%s) %s on disk %s %s" % ( print("myDomainEventBlockJob2Callback: Domain %s(%s) %s on disk %s %s" % (
dom.name(), dom.ID(), BLOCK_JOB_TYPES[type], disk, BLOCK_JOB_STATUS[status])) dom.name(), dom.ID(), BLOCK_JOB_TYPES[type], disk, BLOCK_JOB_STATUS[status]))
def myDomainEventTunableCallback(conn, dom, params, opaque): def myDomainEventTunableCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, params: Dict[str, Any], opaque: _T) -> None:
print("myDomainEventTunableCallback: Domain %s(%s) %s" % ( print("myDomainEventTunableCallback: Domain %s(%s) %s" % (
dom.name(), dom.ID(), params)) dom.name(), dom.ID(), params))
def myDomainEventAgentLifecycleCallback(conn, dom, state, reason, opaque): def myDomainEventAgentLifecycleCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, state: int, reason: int, opaque: _T) -> None:
print("myDomainEventAgentLifecycleCallback: Domain %s(%s) %s %s" % ( print("myDomainEventAgentLifecycleCallback: Domain %s(%s) %s %s" % (
dom.name(), dom.ID(), AGENT_STATES[state], AGENT_REASONS[reason])) dom.name(), dom.ID(), AGENT_STATES[state], AGENT_REASONS[reason]))
def myDomainEventDeviceAddedCallback(conn, dom, dev, opaque): def myDomainEventDeviceAddedCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, dev: str, opaque: _T) -> None:
print("myDomainEventDeviceAddedCallback: Domain %s(%s) device added: %s" % ( print("myDomainEventDeviceAddedCallback: Domain %s(%s) device added: %s" % (
dom.name(), dom.ID(), dev)) dom.name(), dom.ID(), dev))
def myDomainEventMigrationIteration(conn, dom, iteration, opaque): def myDomainEventMigrationIteration(conn: libvirt.virConnect, dom: libvirt.virDomain, iteration: int, opaque: _T) -> None:
print("myDomainEventMigrationIteration: Domain %s(%s) started migration iteration %d" % ( print("myDomainEventMigrationIteration: Domain %s(%s) started migration iteration %d" % (
dom.name(), dom.ID(), iteration)) dom.name(), dom.ID(), iteration))
def myDomainEventJobCompletedCallback(conn, dom, params, opaque): def myDomainEventJobCompletedCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, params: Dict[str, Any], opaque: _T) -> None:
print("myDomainEventJobCompletedCallback: Domain %s(%s) %s" % ( print("myDomainEventJobCompletedCallback: Domain %s(%s) %s" % (
dom.name(), dom.ID(), params)) dom.name(), dom.ID(), params))
def myDomainEventDeviceRemovalFailedCallback(conn, dom, dev, opaque): def myDomainEventDeviceRemovalFailedCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, dev: str, opaque: _T) -> None:
print("myDomainEventDeviceRemovalFailedCallback: Domain %s(%s) failed to remove device: %s" % ( print("myDomainEventDeviceRemovalFailedCallback: Domain %s(%s) failed to remove device: %s" % (
dom.name(), dom.ID(), dev)) dom.name(), dom.ID(), dev))
def myDomainEventMetadataChangeCallback(conn, dom, mtype, nsuri, opaque): def myDomainEventMetadataChangeCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, mtype: int, nsuri: str, opaque: _T) -> None:
print("myDomainEventMetadataChangeCallback: Domain %s(%s) changed metadata mtype=%d nsuri=%s" % ( print("myDomainEventMetadataChangeCallback: Domain %s(%s) changed metadata mtype=%d nsuri=%s" % (
dom.name(), dom.ID(), mtype, nsuri)) dom.name(), dom.ID(), mtype, nsuri))
def myDomainEventBlockThresholdCallback(conn, dom, dev, path, threshold, excess, opaque): def myDomainEventBlockThresholdCallback(conn: libvirt.virConnect, dom: libvirt.virDomain, dev: str, path: str, threshold: int, excess: int, opaque: _T) -> None:
print("myDomainEventBlockThresholdCallback: Domain %s(%s) block device %s(%s) threshold %d exceeded by %d" % ( print("myDomainEventBlockThresholdCallback: Domain %s(%s) block device %s(%s) threshold %d exceeded by %d" % (
dom.name(), dom.ID(), dev, path, threshold, excess)) dom.name(), dom.ID(), dev, path, threshold, excess))
@ -643,7 +647,7 @@ NET_EVENTS = Description(
) )
def myNetworkEventLifecycleCallback(conn, net, event, detail, opaque): def myNetworkEventLifecycleCallback(conn: libvirt.virConnect, net: libvirt.virNetwork, event: int, detail: int, opaque: _T) -> None:
print("myNetworkEventLifecycleCallback: Network %s %s %s" % ( print("myNetworkEventLifecycleCallback: Network %s %s %s" % (
net.name(), NET_EVENTS[event], NET_EVENTS[event][detail])) net.name(), NET_EVENTS[event], NET_EVENTS[event][detail]))
@ -661,12 +665,12 @@ STORAGE_EVENTS = Description(
) )
def myStoragePoolEventLifecycleCallback(conn, pool, event, detail, opaque): def myStoragePoolEventLifecycleCallback(conn: libvirt.virConnect, pool: libvirt.virStoragePool, event: int, detail: int, opaque: _T) -> None:
print("myStoragePoolEventLifecycleCallback: Storage pool %s %s %s" % ( print("myStoragePoolEventLifecycleCallback: Storage pool %s %s %s" % (
pool.name(), STORAGE_EVENTS[event], STORAGE_EVENTS[event][detail])) pool.name(), STORAGE_EVENTS[event], STORAGE_EVENTS[event][detail]))
def myStoragePoolEventRefreshCallback(conn, pool, opaque): def myStoragePoolEventRefreshCallback(conn: libvirt.virConnect, pool: libvirt.virStoragePool, opaque: _T) -> None:
print("myStoragePoolEventRefreshCallback: Storage pool %s" % pool.name()) print("myStoragePoolEventRefreshCallback: Storage pool %s" % pool.name())
@ -679,12 +683,12 @@ DEVICE_EVENTS = Description(
) )
def myNodeDeviceEventLifecycleCallback(conn, dev, event, detail, opaque): def myNodeDeviceEventLifecycleCallback(conn: libvirt.virConnect, dev: libvirt.virNodeDevice, event: int, detail: int, opaque: _T) -> None:
print("myNodeDeviceEventLifecycleCallback: Node device %s %s %s" % ( print("myNodeDeviceEventLifecycleCallback: Node device %s %s %s" % (
dev.name(), DEVICE_EVENTS[event], DEVICE_EVENTS[event][detail])) dev.name(), DEVICE_EVENTS[event], DEVICE_EVENTS[event][detail]))
def myNodeDeviceEventUpdateCallback(conn, dev, opaque): def myNodeDeviceEventUpdateCallback(conn: libvirt.virConnect, dev: libvirt.virNodeDevice, opaque: _T) -> None:
print("myNodeDeviceEventUpdateCallback: Node device %s" % dev.name()) print("myNodeDeviceEventUpdateCallback: Node device %s" % dev.name())
@ -697,12 +701,12 @@ SECRET_EVENTS = Description(
) )
def mySecretEventLifecycleCallback(conn, secret, event, detail, opaque): def mySecretEventLifecycleCallback(conn: libvirt.virConnect, secret: libvirt.virSecret, event: int, detail: int, opaque: _T) -> None:
print("mySecretEventLifecycleCallback: Secret %s %s %s" % ( print("mySecretEventLifecycleCallback: Secret %s %s %s" % (
secret.UUIDString(), SECRET_EVENTS[event], SECRET_EVENTS[event][detail])) secret.UUIDString(), SECRET_EVENTS[event], SECRET_EVENTS[event][detail]))
def mySecretEventValueChanged(conn, secret, opaque): def mySecretEventValueChanged(conn: libvirt.virConnect, secret: libvirt.virSecret, opaque: _T) -> None:
print("mySecretEventValueChanged: Secret %s" % secret.UUIDString()) print("mySecretEventValueChanged: Secret %s" % secret.UUIDString())
@ -714,14 +718,14 @@ run = True
CONNECTION_EVENTS = Description("Error", "End-of-file", "Keepalive", "Client") CONNECTION_EVENTS = Description("Error", "End-of-file", "Keepalive", "Client")
def myConnectionCloseCallback(conn, reason, opaque): def myConnectionCloseCallback(conn: libvirt.virConnect, reason: int, opaque: _T) -> None:
print("myConnectionCloseCallback: %s: %s" % ( print("myConnectionCloseCallback: %s: %s" % (
conn.getURI(), CONNECTION_EVENTS[reason])) conn.getURI(), CONNECTION_EVENTS[reason]))
global run global run
run = False run = False
def usage(): def usage() -> None:
print("usage: %s [-hdl] [uri]" % (os.path.basename(__file__),)) print("usage: %s [-hdl] [uri]" % (os.path.basename(__file__),))
print(" uri will default to qemu:///system") print(" uri will default to qemu:///system")
print(" --help, -h Print this help message") print(" --help, -h Print this help message")
@ -730,7 +734,7 @@ def usage():
print(" --timeout=SECS Quit after SECS seconds running") print(" --timeout=SECS Quit after SECS seconds running")
def main(): def main() -> None:
try: try:
opts, args = getopt.getopt(sys.argv[1:], "hdl:", ["help", "debug", "loop=", "timeout="]) opts, args = getopt.getopt(sys.argv[1:], "hdl:", ["help", "debug", "loop=", "timeout="])
except getopt.GetoptError as err: except getopt.GetoptError as err:
@ -770,7 +774,7 @@ def main():
vc = libvirt.openReadOnly(uri) vc = libvirt.openReadOnly(uri)
# Close connection on exit (to test cleanup paths) # Close connection on exit (to test cleanup paths)
def exit(): def exit() -> None:
print("Closing " + vc.getURI()) print("Closing " + vc.getURI())
if run: if run:
vc.close() vc.close()

View File

@ -8,11 +8,12 @@ import libvirt
import sys import sys
from xml.dom import minidom from xml.dom import minidom
import libxml2 import libxml2
from typing import Any, Dict # noqa F401
def xpath_eval(ctxt, path): def xpath_eval(ctxt, path: str) -> str:
res = ctxt.xpathEval(path) res = ctxt.xpathEval(path)
if res is None or len(res) == 0: if res is None or len(res) == 0:
value = None value = ''
else: else:
value = res[0].content value = res[0].content
return value return value
@ -44,7 +45,7 @@ domsStrict = [ proc
for proc in doms for proc in doms
if proc.numaParameters()["numa_mode"] == libvirt.VIR_DOMAIN_NUMATUNE_MEM_STRICT ] if proc.numaParameters()["numa_mode"] == libvirt.VIR_DOMAIN_NUMATUNE_MEM_STRICT ]
domsStrictCfg = {} domsStrictCfg = {} # type: Dict[libvirt.virDomain, Dict[str, Dict[str, Any]]]
for dom in domsStrict: for dom in domsStrict:
xmlStr = dom.XMLDesc() xmlStr = dom.XMLDesc()
doc = libxml2.parseDoc(xmlStr) doc = libxml2.parseDoc(xmlStr)
@ -67,8 +68,8 @@ for dom in domsStrict:
print("NUMA stats") print("NUMA stats")
print("NUMA nodes:\t" + "\t".join(str(node) for node in nodesIDs)) print("NUMA nodes:\t" + "\t".join(str(node) for node in nodesIDs))
print("MemTotal:\t" + "\t".join(str(i.get("total") // 1024) for i in nodesMem)) print("MemTotal:\t" + "\t".join(str(i.get("total") // 1024) for i in nodesMem)) # type: ignore
print("MemFree:\t" + "\t".join(str(i.get("free") // 1024) for i in nodesMem)) print("MemFree:\t" + "\t".join(str(i.get("free") // 1024) for i in nodesMem)) # type: ignore
for dom, v in domsStrictCfg.items(): for dom, v in domsStrictCfg.items():
print("Domain '%s':\t" % dom.name()) print("Domain '%s':\t" % dom.name())

View File

@ -6,24 +6,24 @@
import libvirt, sys, os import libvirt, sys, os
def bytesWriteHandler(stream, buf, opaque): def bytesWriteHandler(stream: libvirt.virStream, buf: bytes, opaque: int) -> int:
fd = opaque fd = opaque
return os.write(fd, buf) return os.write(fd, buf)
def bytesReadHandler(stream, nbytes, opaque): def bytesReadHandler(stream: libvirt.virStream, nbytes: int, opaque: int) -> bytes:
fd = opaque fd = opaque
return os.read(fd, nbytes) return os.read(fd, nbytes)
def recvSkipHandler(stream, length, opaque): def recvSkipHandler(stream: libvirt.virStream, length: int, opaque: int) -> None:
fd = opaque fd = opaque
cur = os.lseek(fd, length, os.SEEK_CUR) cur = os.lseek(fd, length, os.SEEK_CUR)
return os.ftruncate(fd, cur) return os.ftruncate(fd, cur)
def sendSkipHandler(stream, length, opaque): def sendSkipHandler(stream: libvirt.virStream, length: int, opaque: int) -> int:
fd = opaque fd = opaque
return os.lseek(fd, length, os.SEEK_CUR) return os.lseek(fd, length, os.SEEK_CUR)
def holeHandler(stream, opaque): def holeHandler(stream: libvirt.virStream, opaque: int):
fd = opaque fd = opaque
cur = os.lseek(fd, 0, os.SEEK_CUR) cur = os.lseek(fd, 0, os.SEEK_CUR)
@ -73,7 +73,7 @@ def holeHandler(stream, opaque):
os.lseek(fd, cur, os.SEEK_SET) os.lseek(fd, cur, os.SEEK_SET)
return [inData, sectionLen] return [inData, sectionLen]
def download(vol, st, filename): def download(vol: libvirt.virStorageVol, st: libvirt.virStream, filename: str) -> None:
offset = 0 offset = 0
length = 0 length = 0
@ -83,7 +83,7 @@ def download(vol, st, filename):
os.close(fd) os.close(fd)
def upload(vol, st, filename): def upload(vol: libvirt.virStorageVol, st: libvirt.virStream, filename: str) -> None:
offset = 0 offset = 0
length = 0 length = 0