1
0
mirror of https://gitlab.com/libvirt/libvirt-python.git synced 2024-10-26 16:25:10 +03:00

libvirtaio: Add manual PEP 484 type annotations

Signed-off-by: Philipp Hahn <hahn@univention.de>
This commit is contained in:
Philipp Hahn 2018-11-20 08:59:35 +01:00
parent 6b97281730
commit 026a47928a

View File

@ -35,6 +35,9 @@ import warnings
import libvirt
from typing import Any, Callable, Dict, Generator, Optional, TypeVar # noqa F401
_T = TypeVar('_T')
__author__ = 'Wojtek Porczyk <woju@invisiblethingslab.com>'
__license__ = 'LGPL-2.1+'
__all__ = [
@ -63,16 +66,16 @@ class Callback(object):
_iden_counter = itertools.count()
def __init__(self, impl, cb, opaque):
def __init__(self, impl: "virEventAsyncIOImpl", cb: Callable[[int, _T], None], opaque: _T, ) -> None:
self.iden = next(self._iden_counter)
self.impl = impl
self.cb = cb
self.opaque = opaque
def __repr__(self):
def __repr__(self) -> str:
return '<{} iden={}>'.format(self.__class__.__name__, self.iden)
def close(self):
def close(self) -> None:
'''Schedule *ff* callback'''
self.impl.log.debug('callback %d close(), scheduling ff', self.iden)
self.impl.schedule_ff_callback(self.iden, self.opaque)
@ -88,12 +91,12 @@ class Descriptor(object):
:param virEventAsyncIOImpl impl: the implementation in which we run
:param int fd: the file descriptor
'''
def __init__(self, impl, fd):
def __init__(self, impl: "virEventAsyncIOImpl", fd: int) -> None:
self.impl = impl
self.fd = fd
self.callbacks = {}
self.callbacks = {} # type: Dict
def _handle(self, event):
def _handle(self, event: int) -> None:
'''Dispatch the event to the descriptors
:param int event: The event (from libvirt's constants) being dispatched
@ -102,7 +105,7 @@ class Descriptor(object):
if callback.event is not None and callback.event & event:
callback.cb(callback.iden, self.fd, event, callback.opaque)
def update(self):
def update(self) -> None:
'''Register or unregister callbacks at event loop
This should be called after change of any ``.event`` in callbacks.
@ -136,7 +139,7 @@ class Descriptor(object):
else:
self.impl.loop.remove_writer(self.fd)
def add_handle(self, callback):
def add_handle(self, callback: "FDCallback") -> None:
'''Add a callback to the descriptor
:param FDCallback callback: the callback to add
@ -147,7 +150,7 @@ class Descriptor(object):
self.callbacks[callback.iden] = callback
self.update()
def remove_handle(self, iden):
def remove_handle(self, iden: int) -> None:
'''Remove a callback from the descriptor
:param int iden: the identifier of the callback
@ -167,11 +170,11 @@ class DescriptorDict(dict):
This is used internally by virEventAsyncIOImpl to hold descriptors.
'''
def __init__(self, impl):
def __init__(self, impl: "virEventAsyncIOImpl") -> None:
super().__init__()
self.impl = impl
def __missing__(self, fd):
def __missing__(self, fd: int) -> Descriptor:
descriptor = Descriptor(self.impl, fd)
self[fd] = descriptor
return descriptor
@ -185,16 +188,16 @@ class FDCallback(Callback):
'''
# pylint: disable=too-few-public-methods
def __init__(self, *args, descriptor, event, **kwargs):
def __init__(self, *args: Any, descriptor: Descriptor, event: int, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.descriptor = descriptor
self.event = event
def __repr__(self):
def __repr__(self) -> str:
return '<{} iden={} fd={} event={}>'.format(
self.__class__.__name__, self.iden, self.descriptor.fd, self.event)
def update(self, event):
def update(self, event: int) -> None:
'''Update the callback and fix descriptor's watchers'''
self.event = event
self.descriptor.update()
@ -206,17 +209,17 @@ class FDCallback(Callback):
class TimeoutCallback(Callback):
'''Callback for timer'''
def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.timeout = -1
self._task = None
def __repr__(self):
def __repr__(self) -> str:
return '<{} iden={} timeout={}>'.format(
self.__class__.__name__, self.iden, self.timeout)
@asyncio.coroutine
def _timer(self):
def _timer(self) -> Generator[Any, None, None]:
'''An actual timer running on the event loop.
This is a coroutine.
@ -238,7 +241,7 @@ class TimeoutCallback(Callback):
self.cb(self.iden, self.opaque)
self.impl.log.debug('timer %r callback ended', self.iden)
def update(self, timeout):
def update(self, timeout: int) -> None:
'''Start or the timer, possibly updating timeout'''
self.timeout = timeout
@ -252,7 +255,7 @@ class TimeoutCallback(Callback):
self._task.cancel() # pylint: disable=no-member
self._task = None
def close(self):
def close(self) -> None:
'''Stop the timer and call ff callback'''
self.update(timeout=-1)
super(TimeoutCallback, self).close()
@ -270,9 +273,9 @@ class virEventAsyncIOImpl(object):
If *loop* is not specified, the current (or default) event loop is used.
'''
def __init__(self, loop=None):
def __init__(self, loop: asyncio.AbstractEventLoop = None) -> None:
self.loop = loop or asyncio.get_event_loop()
self.callbacks = {}
self.callbacks = {} # type: Dict[int, Callback]
self.descriptors = DescriptorDict(self)
self.log = logging.getLogger(self.__class__.__name__)
@ -281,23 +284,23 @@ class virEventAsyncIOImpl(object):
self._finished = asyncio.Event(loop=loop)
self._finished.set()
def __repr__(self):
def __repr__(self) -> str:
return '<{} callbacks={} descriptors={}>'.format(
type(self).__name__, self.callbacks, self.descriptors)
def _pending_inc(self):
def _pending_inc(self) -> None:
'''Increase the count of pending affairs. Do not use directly.'''
self._pending += 1
self._finished.clear()
def _pending_dec(self):
def _pending_dec(self) -> None:
'''Decrease the count of pending affairs. Do not use directly.'''
assert self._pending > 0
self._pending -= 1
if self._pending == 0:
self._finished.set()
def register(self):
def register(self) -> "virEventAsyncIOImpl":
'''Register this instance as event loop implementation'''
# pylint: disable=bad-whitespace
self.log.debug('register()')
@ -306,12 +309,12 @@ class virEventAsyncIOImpl(object):
self._add_timeout, self._update_timeout, self._remove_timeout)
return self
def schedule_ff_callback(self, iden, opaque):
def schedule_ff_callback(self, iden: int, opaque: _T) -> None:
'''Schedule a ff callback from one of the handles or timers'''
ensure_future(self._ff_callback(iden, opaque), loop=self.loop)
@asyncio.coroutine
def _ff_callback(self, iden, opaque):
def _ff_callback(self, iden: int, opaque: _T) -> None:
'''Directly free the opaque object
This is a coroutine.
@ -321,7 +324,7 @@ class virEventAsyncIOImpl(object):
self._pending_dec()
@asyncio.coroutine
def drain(self):
def drain(self) -> Generator[Any, None, None]:
'''Wait for the implementation to become idle.
This is a coroutine.
@ -331,7 +334,7 @@ class virEventAsyncIOImpl(object):
yield from self._finished.wait()
self.log.debug('drain ended')
def is_idle(self):
def is_idle(self) -> bool:
'''Returns False if there are leftovers from a connection
Those may happen if there are sematical problems while closing
@ -339,7 +342,7 @@ class virEventAsyncIOImpl(object):
'''
return not self.callbacks and not self._pending
def _add_handle(self, fd, event, cb, opaque):
def _add_handle(self, fd: int, event: int, cb: libvirt._EventCB, opaque: _T) -> int:
'''Register a callback for monitoring file handle events
:param int fd: file descriptor to listen on
@ -363,7 +366,7 @@ class virEventAsyncIOImpl(object):
self._pending_inc()
return callback.iden
def _update_handle(self, watch, event):
def _update_handle(self, watch: int, event: int) -> None:
'''Change event set for a monitored file handle
:param int watch: file descriptor watch to modify
@ -377,7 +380,7 @@ class virEventAsyncIOImpl(object):
assert isinstance(callback, FDCallback)
callback.update(event=event)
def _remove_handle(self, watch):
def _remove_handle(self, watch: int) -> int:
'''Unregister a callback from a file handle.
:param int watch: file descriptor watch to stop listening on
@ -400,7 +403,7 @@ class virEventAsyncIOImpl(object):
callback.close()
return 0
def _add_timeout(self, timeout, cb, opaque):
def _add_timeout(self, timeout: int, cb: libvirt._TimerCB, opaque: _T) -> int:
'''Register a callback for a timer event
:param int timeout: the timeout to monitor
@ -422,7 +425,7 @@ class virEventAsyncIOImpl(object):
self._pending_inc()
return callback.iden
def _update_timeout(self, timer, timeout):
def _update_timeout(self, timer: int, timeout: int) -> None:
'''Change frequency for a timer
:param int timer: the timer to modify
@ -436,7 +439,7 @@ class virEventAsyncIOImpl(object):
assert isinstance(callback, TimeoutCallback)
callback.update(timeout=timeout)
def _remove_timeout(self, timer):
def _remove_timeout(self, timer: int) -> int:
'''Unregister a callback for a timer
:param int timer: the timer to remove
@ -455,15 +458,15 @@ class virEventAsyncIOImpl(object):
return 0
_current_impl = None
_current_impl = None # type: Optional[virEventAsyncIOImpl]
def getCurrentImpl():
def getCurrentImpl() -> Optional[virEventAsyncIOImpl]:
'''Return the current implementation, or None if not yet registered'''
return _current_impl
def virEventRegisterAsyncIOImpl(loop=None):
def virEventRegisterAsyncIOImpl(loop: asyncio.AbstractEventLoop = None) -> virEventAsyncIOImpl:
'''Arrange for libvirt's callbacks to be dispatched via asyncio event loop
The implementation object is returned, but in normal usage it can safely be