From f70939fc3e2882aa55ad358617d4eaad7b5f5291 Mon Sep 17 00:00:00 2001 From: Wojtek Porczyk Date: Thu, 14 Sep 2017 02:41:12 +0200 Subject: [PATCH] libvirtaio: add .drain() coroutine The intended use is to ensure that the implementation is empty, which is one way to ensure that all connections were properly closed and file descriptors reclaimed. Signed-off-by: Wojtek Porczyk --- libvirtaio.py | 36 ++++++++++++++++++++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/libvirtaio.py b/libvirtaio.py index 97a7f6c..1c432dd 100644 --- a/libvirtaio.py +++ b/libvirtaio.py @@ -269,10 +269,27 @@ class virEventAsyncIOImpl(object): self.descriptors = DescriptorDict(self) self.log = logging.getLogger(self.__class__.__name__) + # NOTE invariant: _finished.is_set() iff _pending == 0 + self._pending = 0 + self._finished = asyncio.Event(loop=loop) + self._finished.set() + def __repr__(self): return '<{} callbacks={} descriptors={}>'.format( type(self).__name__, self.callbacks, self.descriptors) + def _pending_inc(self): + '''Increase the count of pending affairs. Do not use directly.''' + self._pending += 1 + self._finished.clear() + + def _pending_dec(self): + '''Decrease the count of pending affairs. Do not use directly.''' + assert self._pending > 0 + self._pending -= 1 + if self._pending == 0: + self._finished.set() + def register(self): '''Register this instance as event loop implementation''' # pylint: disable=bad-whitespace @@ -293,7 +310,20 @@ class virEventAsyncIOImpl(object): This is a coroutine. ''' self.log.debug('ff_callback(iden=%d, opaque=...)', iden) - return libvirt.virEventInvokeFreeCallback(opaque) + ret = libvirt.virEventInvokeFreeCallback(opaque) + self._pending_dec() + return ret + + @asyncio.coroutine + def drain(self): + '''Wait for the implementation to become idle. + + This is a coroutine. + ''' + self.log.debug('drain()') + if self._pending: + yield from self._finished.wait() + self.log.debug('drain ended') def is_idle(self): '''Returns False if there are leftovers from a connection @@ -301,7 +331,7 @@ class virEventAsyncIOImpl(object): Those may happen if there are sematical problems while closing a connection. For example, not deregistered events before .close(). ''' - return not self.callbacks + return not self.callbacks and not self._pending def _add_handle(self, fd, event, cb, opaque): '''Register a callback for monitoring file handle events @@ -324,6 +354,7 @@ class virEventAsyncIOImpl(object): fd, event, callback.iden) self.callbacks[callback.iden] = callback self.descriptors[fd].add_handle(callback) + self._pending_inc() return callback.iden def _update_handle(self, watch, event): @@ -378,6 +409,7 @@ class virEventAsyncIOImpl(object): timeout, callback.iden) self.callbacks[callback.iden] = callback callback.update(timeout=timeout) + self._pending_inc() return callback.iden def _update_timeout(self, timer, timeout):