mirror of
https://gitlab.com/libvirt/libvirt-python.git
synced 2024-10-26 16:25:10 +03:00
8c800b0adf
This reverts commitf4be03b330
. While object.__init__() does not expect any additional arguments, this construct is required for Pythons multiple inheritance implementation. The original author Wojtek Porczyk <woju@invisiblethingslab.com> explained is this way: > I'm sorry I didn't notice this earlier, but the commitf4be03b3
dated > 2020-04-20 [0] is wrong. The super().__init__(*args, **kwargs) in > Callback.__init__ was there on purpose, because of how Python's inheritance in > new-style classes works. > > Let me explain this a bit, because it is not obvious. > > Suppose you had diamond inheritance like this: > > class A(object): pass > class B(A): pass > class C(A): pass > class D(B,C): pass > > And those classes needed a common function with varying arguments: > > class A(object): > def spam(self, a): print(f'A: {a}') > class B(A): > def spam(self, b): print(f'B: {b}') > class C(A): > def spam(self, c): print(f'C: {c}') > class D(B,C): > def spam(self, d): print(f'D: {d}') > > The way to call all parent's functions exactly once (as per MRO) and accept > all arguments and also forbid unknown arguments is to accept **kwargs > everywhere and pass them to super().spam(): > > class A: > def spam(self, a): > print(f'A: {a}') > class B(A): > def spam(self, b, **kwargs): > print(f'B: {b}') > super().spam(**kwargs) > class C(A): > def spam(self, c, **kwargs): > print(f'C: {c}') > super().spam(**kwargs) > class D(B, C): > def spam(self, d, **kwargs): > print(f'D: {d}') > super().spam(**kwargs) > > Let's run this: > > >>> B().spam(a=1, b=2) > B: 2 > A: 1 > >>> D().spam(a=1, b=2, c=3, d=4) > D: 4 > B: 2 > C: 3 > A: 1 > > You may notice that super() in B.spam refers to two different classes, either > A or C, depending on inheritance order in yet undefined classes (as of B's > definition). > > That's why the conclusion that super() in Callback.__init__ refers to object > is wrong. In this example, spam=__init__, A=object, B=Callback and C and D are > not yet written, but theoretically possible classes that could be written by > someone else. Why would they be needed, I don't know, but if someone writes > them, s/he would be out of options to invent new arguments to C.__init__. > > Note that super().__init__(*args, **kwargs) when super() refers to object > isn't harmful, and just ensures that args and kwargs are empty (i.e. no > unknown arguments were passed). In fact, this is exactly why object.__init__() > takes no arguments since Python 2.6 [1][2], as you correctly point out in the > commit message. > > I don't think this breaks anything (I very much doubt anyone would need to > write code that would trigger this), nevertheless, as the commit is both > pointless and wrong, and as the original author of libvirtaio I'd like to ask > for this commit to be reverted. If this breaks some static analysis tool, > could you just suppress it for this particular line? > > > [0]f4be03b330
> [1] https://bugs.python.org/issue1683368 > [2] https://docs.python.org/3/whatsnew/2.6.html#porting-to-python-2-6 > (fourth point) > Signed-off-by: Philipp Hahn <hahn@univention.de>
479 lines
16 KiB
Python
479 lines
16 KiB
Python
#
|
|
# libvirtaio -- asyncio adapter for libvirt
|
|
# Copyright (C) 2017 Wojtek Porczyk <woju@invisiblethingslab.com>
|
|
#
|
|
# This library is free software; you can redistribute it and/or
|
|
# modify it under the terms of the GNU Lesser General Public
|
|
# License as published by the Free Software Foundation; either
|
|
# version 2.1 of the License, or (at your option) any later version.
|
|
#
|
|
# This library is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
# Lesser General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU Lesser General Public
|
|
# License along with this library; if not, see
|
|
# <http://www.gnu.org/licenses/>.
|
|
#
|
|
|
|
'''Libvirt event loop implementation using asyncio
|
|
|
|
Register the implementation of default loop:
|
|
|
|
>>> import libvirtaio
|
|
>>> libvirtaio.virEventRegisterAsyncIOImpl()
|
|
|
|
.. seealso::
|
|
https://libvirt.org/html/libvirt-libvirt-event.html
|
|
'''
|
|
|
|
import asyncio
|
|
import itertools
|
|
import logging
|
|
import warnings
|
|
|
|
import libvirt
|
|
|
|
from typing import Any, Callable, Dict, Generator, Optional, TypeVar # noqa F401
|
|
_T = TypeVar('_T')
|
|
|
|
__author__ = 'Wojtek Porczyk <woju@invisiblethingslab.com>'
|
|
__license__ = 'LGPL-2.1+'
|
|
__all__ = [
|
|
'getCurrentImpl',
|
|
'virEventAsyncIOImpl',
|
|
'virEventRegisterAsyncIOImpl',
|
|
]
|
|
|
|
# Python < 3.4.4 doesn't have 'ensure_future', so we have to fall
|
|
# back to 'async'; however, since 'async' is a reserved keyword
|
|
# in Python >= 3.7, we can't perform a straightforward import and
|
|
# we have to resort to getattr() instead
|
|
ensure_future = getattr(asyncio, "ensure_future", None)
|
|
if not ensure_future:
|
|
ensure_future = getattr(asyncio, "async")
|
|
|
|
|
|
class Callback(object):
|
|
'''Base class for holding callback
|
|
|
|
:param virEventAsyncIOImpl impl: the implementation in which we run
|
|
:param cb: the callback itself
|
|
:param opaque: the opaque tuple passed by libvirt
|
|
'''
|
|
# pylint: disable=too-few-public-methods
|
|
|
|
_iden_counter = itertools.count()
|
|
|
|
def __init__(self, impl: "virEventAsyncIOImpl", cb: Callable[[int, _T], None], opaque: _T, *args: Any, **kwargs: Any) -> None:
|
|
super().__init__(*args, **kwargs) # type: ignore
|
|
self.iden = next(self._iden_counter)
|
|
self.impl = impl
|
|
self.cb = cb
|
|
self.opaque = opaque
|
|
|
|
def __repr__(self) -> str:
|
|
return '<{} iden={}>'.format(self.__class__.__name__, self.iden)
|
|
|
|
def close(self) -> None:
|
|
'''Schedule *ff* callback'''
|
|
self.impl.log.debug('callback %d close(), scheduling ff', self.iden)
|
|
self.impl.schedule_ff_callback(self.iden, self.opaque)
|
|
|
|
|
|
#
|
|
# file descriptors
|
|
#
|
|
|
|
class Descriptor(object):
|
|
'''Manager of one file descriptor
|
|
|
|
:param virEventAsyncIOImpl impl: the implementation in which we run
|
|
:param int fd: the file descriptor
|
|
'''
|
|
def __init__(self, impl: "virEventAsyncIOImpl", fd: int) -> None:
|
|
self.impl = impl
|
|
self.fd = fd
|
|
self.callbacks = {} # type: Dict
|
|
|
|
def _handle(self, event: int) -> None:
|
|
'''Dispatch the event to the descriptors
|
|
|
|
:param int event: The event (from libvirt's constants) being dispatched
|
|
'''
|
|
for callback in list(self.callbacks.values()):
|
|
if callback.event is not None and callback.event & event:
|
|
callback.cb(callback.iden, self.fd, event, callback.opaque)
|
|
|
|
def update(self) -> None:
|
|
'''Register or unregister callbacks at event loop
|
|
|
|
This should be called after change of any ``.event`` in callbacks.
|
|
'''
|
|
# It seems like loop.add_{reader,writer} can be run multiple times
|
|
# and will still register the callback only once. Likewise,
|
|
# remove_{reader,writer} may be run even if the reader/writer
|
|
# is not registered (and will just return False).
|
|
|
|
# For the edge case of empty callbacks, any() returns False.
|
|
if any(callback.event & ~(
|
|
libvirt.VIR_EVENT_HANDLE_READABLE |
|
|
libvirt.VIR_EVENT_HANDLE_WRITABLE)
|
|
for callback in self.callbacks.values()):
|
|
warnings.warn(
|
|
'The only event supported are VIR_EVENT_HANDLE_READABLE '
|
|
'and VIR_EVENT_HANDLE_WRITABLE',
|
|
UserWarning)
|
|
|
|
if any(callback.event & libvirt.VIR_EVENT_HANDLE_READABLE
|
|
for callback in self.callbacks.values()):
|
|
self.impl.loop.add_reader(
|
|
self.fd, self._handle, libvirt.VIR_EVENT_HANDLE_READABLE)
|
|
else:
|
|
self.impl.loop.remove_reader(self.fd)
|
|
|
|
if any(callback.event & libvirt.VIR_EVENT_HANDLE_WRITABLE
|
|
for callback in self.callbacks.values()):
|
|
self.impl.loop.add_writer(
|
|
self.fd, self._handle, libvirt.VIR_EVENT_HANDLE_WRITABLE)
|
|
else:
|
|
self.impl.loop.remove_writer(self.fd)
|
|
|
|
def add_handle(self, callback: "FDCallback") -> None:
|
|
'''Add a callback to the descriptor
|
|
|
|
:param FDCallback callback: the callback to add
|
|
:rtype: None
|
|
|
|
After adding the callback, it is immediately watched.
|
|
'''
|
|
self.callbacks[callback.iden] = callback
|
|
self.update()
|
|
|
|
def remove_handle(self, iden: int) -> None:
|
|
'''Remove a callback from the descriptor
|
|
|
|
:param int iden: the identifier of the callback
|
|
:returns: the callback
|
|
:rtype: FDCallback
|
|
|
|
After removing the callback, the descriptor may be unwatched, if there
|
|
are no more handles for it.
|
|
'''
|
|
callback = self.callbacks.pop(iden)
|
|
self.update()
|
|
return callback
|
|
|
|
|
|
class DescriptorDict(dict):
|
|
'''Descriptors collection
|
|
|
|
This is used internally by virEventAsyncIOImpl to hold descriptors.
|
|
'''
|
|
def __init__(self, impl: "virEventAsyncIOImpl") -> None:
|
|
super().__init__()
|
|
self.impl = impl
|
|
|
|
def __missing__(self, fd: int) -> Descriptor:
|
|
descriptor = Descriptor(self.impl, fd)
|
|
self[fd] = descriptor
|
|
return descriptor
|
|
|
|
|
|
class FDCallback(Callback):
|
|
'''Callback for file descriptor (watcher)
|
|
|
|
:param Descriptor descriptor: the descriptor manager
|
|
:param int event: bitset of events on which to fire the callback
|
|
'''
|
|
# pylint: disable=too-few-public-methods
|
|
|
|
def __init__(self, *args: Any, descriptor: Descriptor, event: int, **kwargs: Any) -> None:
|
|
super().__init__(*args, **kwargs)
|
|
self.descriptor = descriptor
|
|
self.event = event
|
|
|
|
def __repr__(self) -> str:
|
|
return '<{} iden={} fd={} event={}>'.format(
|
|
self.__class__.__name__, self.iden, self.descriptor.fd, self.event)
|
|
|
|
def update(self, event: int) -> None:
|
|
'''Update the callback and fix descriptor's watchers'''
|
|
self.event = event
|
|
self.descriptor.update()
|
|
|
|
|
|
#
|
|
# timeouts
|
|
#
|
|
|
|
class TimeoutCallback(Callback):
|
|
'''Callback for timer'''
|
|
def __init__(self, *args: Any, **kwargs: Any) -> None:
|
|
super().__init__(*args, **kwargs)
|
|
self.timeout = -1
|
|
self._task = None
|
|
|
|
def __repr__(self) -> str:
|
|
return '<{} iden={} timeout={}>'.format(
|
|
self.__class__.__name__, self.iden, self.timeout)
|
|
|
|
@asyncio.coroutine
|
|
def _timer(self) -> Generator[Any, None, None]:
|
|
'''An actual timer running on the event loop.
|
|
|
|
This is a coroutine.
|
|
'''
|
|
while True:
|
|
try:
|
|
if self.timeout > 0:
|
|
timeout = self.timeout * 1e-3
|
|
self.impl.log.debug('sleeping %r', timeout)
|
|
yield from asyncio.sleep(timeout)
|
|
else:
|
|
# scheduling timeout for next loop iteration
|
|
yield
|
|
|
|
except asyncio.CancelledError:
|
|
self.impl.log.debug('timer %d cancelled', self.iden)
|
|
break
|
|
|
|
self.cb(self.iden, self.opaque)
|
|
self.impl.log.debug('timer %r callback ended', self.iden)
|
|
|
|
def update(self, timeout: int) -> None:
|
|
'''Start or the timer, possibly updating timeout'''
|
|
self.timeout = timeout
|
|
|
|
if self.timeout >= 0 and self._task is None:
|
|
self.impl.log.debug('timer %r start', self.iden)
|
|
self._task = ensure_future(self._timer(),
|
|
loop=self.impl.loop)
|
|
|
|
elif self.timeout < 0 and self._task is not None:
|
|
self.impl.log.debug('timer %r stop', self.iden)
|
|
self._task.cancel() # pylint: disable=no-member
|
|
self._task = None
|
|
|
|
def close(self) -> None:
|
|
'''Stop the timer and call ff callback'''
|
|
self.update(timeout=-1)
|
|
super(TimeoutCallback, self).close()
|
|
|
|
|
|
#
|
|
# main implementation
|
|
#
|
|
|
|
class virEventAsyncIOImpl(object):
|
|
'''Libvirt event adapter to asyncio.
|
|
|
|
:param loop: asyncio's event loop
|
|
|
|
If *loop* is not specified, the current (or default) event loop is used.
|
|
'''
|
|
|
|
def __init__(self, loop: asyncio.AbstractEventLoop = None) -> None:
|
|
self.loop = loop or asyncio.get_event_loop()
|
|
self.callbacks = {} # type: Dict[int, Callback]
|
|
self.descriptors = DescriptorDict(self)
|
|
self.log = logging.getLogger(self.__class__.__name__)
|
|
|
|
# NOTE invariant: _finished.is_set() iff _pending == 0
|
|
self._pending = 0
|
|
self._finished = asyncio.Event(loop=loop)
|
|
self._finished.set()
|
|
|
|
def __repr__(self) -> str:
|
|
return '<{} callbacks={} descriptors={}>'.format(
|
|
type(self).__name__, self.callbacks, self.descriptors)
|
|
|
|
def _pending_inc(self) -> None:
|
|
'''Increase the count of pending affairs. Do not use directly.'''
|
|
self._pending += 1
|
|
self._finished.clear()
|
|
|
|
def _pending_dec(self) -> None:
|
|
'''Decrease the count of pending affairs. Do not use directly.'''
|
|
assert self._pending > 0
|
|
self._pending -= 1
|
|
if self._pending == 0:
|
|
self._finished.set()
|
|
|
|
def register(self) -> "virEventAsyncIOImpl":
|
|
'''Register this instance as event loop implementation'''
|
|
# pylint: disable=bad-whitespace
|
|
self.log.debug('register()')
|
|
libvirt.virEventRegisterImpl(
|
|
self._add_handle, self._update_handle, self._remove_handle,
|
|
self._add_timeout, self._update_timeout, self._remove_timeout)
|
|
return self
|
|
|
|
def schedule_ff_callback(self, iden: int, opaque: _T) -> None:
|
|
'''Schedule a ff callback from one of the handles or timers'''
|
|
ensure_future(self._ff_callback(iden, opaque), loop=self.loop)
|
|
|
|
@asyncio.coroutine
|
|
def _ff_callback(self, iden: int, opaque: _T) -> None:
|
|
'''Directly free the opaque object
|
|
|
|
This is a coroutine.
|
|
'''
|
|
self.log.debug('ff_callback(iden=%d, opaque=...)', iden)
|
|
libvirt.virEventInvokeFreeCallback(opaque)
|
|
self._pending_dec()
|
|
|
|
@asyncio.coroutine
|
|
def drain(self) -> Generator[Any, None, None]:
|
|
'''Wait for the implementation to become idle.
|
|
|
|
This is a coroutine.
|
|
'''
|
|
self.log.debug('drain()')
|
|
if self._pending:
|
|
yield from self._finished.wait()
|
|
self.log.debug('drain ended')
|
|
|
|
def is_idle(self) -> bool:
|
|
'''Returns False if there are leftovers from a connection
|
|
|
|
Those may happen if there are sematical problems while closing
|
|
a connection. For example, not deregistered events before .close().
|
|
'''
|
|
return not self.callbacks and not self._pending
|
|
|
|
def _add_handle(self, fd: int, event: int, cb: libvirt._EventCB, opaque: _T) -> int:
|
|
'''Register a callback for monitoring file handle events
|
|
|
|
:param int fd: file descriptor to listen on
|
|
:param int event: bitset of events on which to fire the callback
|
|
:param cb: the callback to be called when an event occurrs
|
|
:param opaque: user data to pass to the callback
|
|
:rtype: int
|
|
:returns: handle watch number to be used for updating and unregistering for events
|
|
|
|
.. seealso::
|
|
https://libvirt.org/html/libvirt-libvirt-event.html#virEventAddHandleFuncFunc
|
|
'''
|
|
callback = FDCallback(self, cb, opaque,
|
|
descriptor=self.descriptors[fd], event=event)
|
|
assert callback.iden not in self.callbacks
|
|
|
|
self.log.debug('add_handle(fd=%d, event=%d, cb=..., opaque=...) = %d',
|
|
fd, event, callback.iden)
|
|
self.callbacks[callback.iden] = callback
|
|
self.descriptors[fd].add_handle(callback)
|
|
self._pending_inc()
|
|
return callback.iden
|
|
|
|
def _update_handle(self, watch: int, event: int) -> None:
|
|
'''Change event set for a monitored file handle
|
|
|
|
:param int watch: file descriptor watch to modify
|
|
:param int event: new events to listen on
|
|
|
|
.. seealso::
|
|
https://libvirt.org/html/libvirt-libvirt-event.html#virEventUpdateHandleFunc
|
|
'''
|
|
self.log.debug('update_handle(watch=%d, event=%d)', watch, event)
|
|
callback = self.callbacks[watch]
|
|
assert isinstance(callback, FDCallback)
|
|
callback.update(event=event)
|
|
|
|
def _remove_handle(self, watch: int) -> int:
|
|
'''Unregister a callback from a file handle.
|
|
|
|
:param int watch: file descriptor watch to stop listening on
|
|
:returns: -1 on error, 0 on success
|
|
|
|
.. seealso::
|
|
https://libvirt.org/html/libvirt-libvirt-event.html#virEventRemoveHandleFunc
|
|
'''
|
|
self.log.debug('remove_handle(watch=%d)', watch)
|
|
try:
|
|
callback = self.callbacks.pop(watch)
|
|
except KeyError as err:
|
|
self.log.warning('remove_handle(): no such handle: %r', err.args[0])
|
|
return -1
|
|
assert isinstance(callback, FDCallback)
|
|
fd = callback.descriptor.fd
|
|
assert callback is self.descriptors[fd].remove_handle(watch)
|
|
if len(self.descriptors[fd].callbacks) == 0:
|
|
del self.descriptors[fd]
|
|
callback.close()
|
|
return 0
|
|
|
|
def _add_timeout(self, timeout: int, cb: libvirt._TimerCB, opaque: _T) -> int:
|
|
'''Register a callback for a timer event
|
|
|
|
:param int timeout: the timeout to monitor
|
|
:param cb: the callback to call when timeout has expired
|
|
:param opaque: user data to pass to the callback
|
|
:rtype: int
|
|
:returns: a timer value
|
|
|
|
.. seealso::
|
|
https://libvirt.org/html/libvirt-libvirt-event.html#virEventAddTimeoutFunc
|
|
'''
|
|
callback = TimeoutCallback(self, cb, opaque)
|
|
assert callback.iden not in self.callbacks
|
|
|
|
self.log.debug('add_timeout(timeout=%d, cb=..., opaque=...) = %d',
|
|
timeout, callback.iden)
|
|
self.callbacks[callback.iden] = callback
|
|
callback.update(timeout=timeout)
|
|
self._pending_inc()
|
|
return callback.iden
|
|
|
|
def _update_timeout(self, timer: int, timeout: int) -> None:
|
|
'''Change frequency for a timer
|
|
|
|
:param int timer: the timer to modify
|
|
:param int timeout: the new timeout value in ms
|
|
|
|
.. seealso::
|
|
https://libvirt.org/html/libvirt-libvirt-event.html#virEventUpdateTimeoutFunc
|
|
'''
|
|
self.log.debug('update_timeout(timer=%d, timeout=%d)', timer, timeout)
|
|
callback = self.callbacks[timer]
|
|
assert isinstance(callback, TimeoutCallback)
|
|
callback.update(timeout=timeout)
|
|
|
|
def _remove_timeout(self, timer: int) -> int:
|
|
'''Unregister a callback for a timer
|
|
|
|
:param int timer: the timer to remove
|
|
:returns: -1 on error, 0 on success
|
|
|
|
.. seealso::
|
|
https://libvirt.org/html/libvirt-libvirt-event.html#virEventRemoveTimeoutFunc
|
|
'''
|
|
self.log.debug('remove_timeout(timer=%d)', timer)
|
|
try:
|
|
callback = self.callbacks.pop(timer)
|
|
except KeyError as err:
|
|
self.log.warning('remove_timeout(): no such timeout: %r', err.args[0])
|
|
return -1
|
|
callback.close()
|
|
return 0
|
|
|
|
|
|
_current_impl = None # type: Optional[virEventAsyncIOImpl]
|
|
|
|
|
|
def getCurrentImpl() -> Optional[virEventAsyncIOImpl]:
|
|
'''Return the current implementation, or None if not yet registered'''
|
|
return _current_impl
|
|
|
|
|
|
def virEventRegisterAsyncIOImpl(loop: asyncio.AbstractEventLoop = None) -> virEventAsyncIOImpl:
|
|
'''Arrange for libvirt's callbacks to be dispatched via asyncio event loop
|
|
|
|
The implementation object is returned, but in normal usage it can safely be
|
|
discarded.
|
|
'''
|
|
global _current_impl
|
|
_current_impl = virEventAsyncIOImpl(loop=loop).register()
|
|
return _current_impl
|