2017-03-17 16:35:53 +03:00
#
# libvirtaio -- asyncio adapter for libvirt
# Copyright (C) 2017 Wojtek Porczyk <woju@invisiblethingslab.com>
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License as published by the Free Software Foundation; either
# version 2.1 of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this library; if not, see
# <http://www.gnu.org/licenses/>.
#
''' Libvirt event loop implementation using asyncio
Register the implementation of default loop :
>> > import libvirtaio
>> > libvirtaio . virEventRegisterAsyncIOImpl ( )
. . seealso : :
https : / / libvirt . org / html / libvirt - libvirt - event . html
'''
2018-11-16 15:55:56 +03:00
import asyncio
import itertools
import logging
import warnings
import libvirt
2017-03-17 16:35:53 +03:00
__author__ = ' Wojtek Porczyk <woju@invisiblethingslab.com> '
__license__ = ' LGPL-2.1+ '
2017-09-14 03:29:29 +03:00
__all__ = [
' getCurrentImpl ' ,
' virEventAsyncIOImpl ' ,
' virEventRegisterAsyncIOImpl ' ,
]
2017-03-17 16:35:53 +03:00
2018-06-25 21:14:57 +03:00
# Python < 3.4.4 doesn't have 'ensure_future', so we have to fall
# back to 'async'; however, since 'async' is a reserved keyword
# in Python >= 3.7, we can't perform a straightforward import and
# we have to resort to getattr() instead
ensure_future = getattr ( asyncio , " ensure_future " , None )
if not ensure_future :
ensure_future = getattr ( asyncio , " async " )
2017-03-17 16:35:53 +03:00
class Callback ( object ) :
''' Base class for holding callback
: param virEventAsyncIOImpl impl : the implementation in which we run
: param cb : the callback itself
: param opaque : the opaque tuple passed by libvirt
'''
# pylint: disable=too-few-public-methods
_iden_counter = itertools . count ( )
2020-04-20 11:30:52 +03:00
def __init__ ( self , impl , cb , opaque ) :
2017-03-17 16:35:53 +03:00
self . iden = next ( self . _iden_counter )
self . impl = impl
self . cb = cb
self . opaque = opaque
def __repr__ ( self ) :
return ' < {} iden= {} > ' . format ( self . __class__ . __name__ , self . iden )
def close ( self ) :
''' Schedule *ff* callback '''
self . impl . log . debug ( ' callback %d close(), scheduling ff ' , self . iden )
2017-08-24 22:22:49 +03:00
self . impl . schedule_ff_callback ( self . iden , self . opaque )
2017-03-17 16:35:53 +03:00
2018-11-20 10:34:48 +03:00
2017-03-17 16:35:53 +03:00
#
# file descriptors
#
class Descriptor ( object ) :
''' Manager of one file descriptor
: param virEventAsyncIOImpl impl : the implementation in which we run
: param int fd : the file descriptor
'''
def __init__ ( self , impl , fd ) :
self . impl = impl
self . fd = fd
self . callbacks = { }
def _handle ( self , event ) :
''' Dispatch the event to the descriptors
: param int event : The event ( from libvirt ' s constants) being dispatched
'''
2017-08-31 22:20:40 +03:00
for callback in list ( self . callbacks . values ( ) ) :
2017-03-17 16:35:53 +03:00
if callback . event is not None and callback . event & event :
callback . cb ( callback . iden , self . fd , event , callback . opaque )
def update ( self ) :
''' Register or unregister callbacks at event loop
This should be called after change of any ` ` . event ` ` in callbacks .
'''
# It seems like loop.add_{reader,writer} can be run multiple times
# and will still register the callback only once. Likewise,
# remove_{reader,writer} may be run even if the reader/writer
# is not registered (and will just return False).
# For the edge case of empty callbacks, any() returns False.
if any ( callback . event & ~ (
2018-11-20 10:34:48 +03:00
libvirt . VIR_EVENT_HANDLE_READABLE |
libvirt . VIR_EVENT_HANDLE_WRITABLE )
2017-03-17 16:35:53 +03:00
for callback in self . callbacks . values ( ) ) :
warnings . warn (
' The only event supported are VIR_EVENT_HANDLE_READABLE '
' and VIR_EVENT_HANDLE_WRITABLE ' ,
UserWarning )
if any ( callback . event & libvirt . VIR_EVENT_HANDLE_READABLE
for callback in self . callbacks . values ( ) ) :
self . impl . loop . add_reader (
self . fd , self . _handle , libvirt . VIR_EVENT_HANDLE_READABLE )
else :
self . impl . loop . remove_reader ( self . fd )
if any ( callback . event & libvirt . VIR_EVENT_HANDLE_WRITABLE
for callback in self . callbacks . values ( ) ) :
self . impl . loop . add_writer (
self . fd , self . _handle , libvirt . VIR_EVENT_HANDLE_WRITABLE )
else :
self . impl . loop . remove_writer ( self . fd )
def add_handle ( self , callback ) :
''' Add a callback to the descriptor
: param FDCallback callback : the callback to add
: rtype : None
After adding the callback , it is immediately watched .
'''
self . callbacks [ callback . iden ] = callback
self . update ( )
def remove_handle ( self , iden ) :
''' Remove a callback from the descriptor
: param int iden : the identifier of the callback
: returns : the callback
: rtype : FDCallback
After removing the callback , the descriptor may be unwatched , if there
are no more handles for it .
'''
callback = self . callbacks . pop ( iden )
self . update ( )
return callback
2018-11-20 10:34:48 +03:00
2017-03-17 16:35:53 +03:00
class DescriptorDict ( dict ) :
''' Descriptors collection
This is used internally by virEventAsyncIOImpl to hold descriptors .
'''
def __init__ ( self , impl ) :
super ( ) . __init__ ( )
self . impl = impl
def __missing__ ( self , fd ) :
descriptor = Descriptor ( self . impl , fd )
self [ fd ] = descriptor
return descriptor
2018-11-20 10:34:48 +03:00
2017-03-17 16:35:53 +03:00
class FDCallback ( Callback ) :
''' Callback for file descriptor (watcher)
: param Descriptor descriptor : the descriptor manager
: param int event : bitset of events on which to fire the callback
'''
# pylint: disable=too-few-public-methods
def __init__ ( self , * args , descriptor , event , * * kwargs ) :
super ( ) . __init__ ( * args , * * kwargs )
self . descriptor = descriptor
self . event = event
def __repr__ ( self ) :
return ' < {} iden= {} fd= {} event= {} > ' . format (
self . __class__ . __name__ , self . iden , self . descriptor . fd , self . event )
def update ( self , event ) :
''' Update the callback and fix descriptor ' s watchers '''
self . event = event
self . descriptor . update ( )
2018-11-20 10:34:48 +03:00
2017-03-17 16:35:53 +03:00
#
# timeouts
#
class TimeoutCallback ( Callback ) :
''' Callback for timer '''
def __init__ ( self , * args , * * kwargs ) :
super ( ) . __init__ ( * args , * * kwargs )
self . timeout = - 1
self . _task = None
def __repr__ ( self ) :
return ' < {} iden= {} timeout= {} > ' . format (
self . __class__ . __name__ , self . iden , self . timeout )
@asyncio.coroutine
def _timer ( self ) :
''' An actual timer running on the event loop.
This is a coroutine .
'''
while True :
try :
if self . timeout > 0 :
timeout = self . timeout * 1e-3
self . impl . log . debug ( ' sleeping %r ' , timeout )
yield from asyncio . sleep ( timeout )
else :
# scheduling timeout for next loop iteration
yield
except asyncio . CancelledError :
self . impl . log . debug ( ' timer %d cancelled ' , self . iden )
break
self . cb ( self . iden , self . opaque )
self . impl . log . debug ( ' timer %r callback ended ' , self . iden )
def update ( self , timeout ) :
''' Start or the timer, possibly updating timeout '''
self . timeout = timeout
if self . timeout > = 0 and self . _task is None :
self . impl . log . debug ( ' timer %r start ' , self . iden )
self . _task = ensure_future ( self . _timer ( ) ,
2018-11-20 10:34:48 +03:00
loop = self . impl . loop )
2017-03-17 16:35:53 +03:00
elif self . timeout < 0 and self . _task is not None :
self . impl . log . debug ( ' timer %r stop ' , self . iden )
self . _task . cancel ( ) # pylint: disable=no-member
self . _task = None
def close ( self ) :
''' Stop the timer and call ff callback '''
self . update ( timeout = - 1 )
2017-09-14 03:26:53 +03:00
super ( TimeoutCallback , self ) . close ( )
2017-03-17 16:35:53 +03:00
2018-11-20 10:34:48 +03:00
2017-03-17 16:35:53 +03:00
#
# main implementation
#
class virEventAsyncIOImpl ( object ) :
''' Libvirt event adapter to asyncio.
: param loop : asyncio ' s event loop
If * loop * is not specified , the current ( or default ) event loop is used .
'''
def __init__ ( self , loop = None ) :
self . loop = loop or asyncio . get_event_loop ( )
self . callbacks = { }
self . descriptors = DescriptorDict ( self )
self . log = logging . getLogger ( self . __class__ . __name__ )
2017-09-14 03:41:12 +03:00
# NOTE invariant: _finished.is_set() iff _pending == 0
self . _pending = 0
self . _finished = asyncio . Event ( loop = loop )
self . _finished . set ( )
2017-08-24 22:22:49 +03:00
def __repr__ ( self ) :
return ' < {} callbacks= {} descriptors= {} > ' . format (
type ( self ) . __name__ , self . callbacks , self . descriptors )
2017-09-14 03:41:12 +03:00
def _pending_inc ( self ) :
''' Increase the count of pending affairs. Do not use directly. '''
self . _pending + = 1
self . _finished . clear ( )
def _pending_dec ( self ) :
''' Decrease the count of pending affairs. Do not use directly. '''
assert self . _pending > 0
self . _pending - = 1
if self . _pending == 0 :
self . _finished . set ( )
2017-03-17 16:35:53 +03:00
def register ( self ) :
''' Register this instance as event loop implementation '''
# pylint: disable=bad-whitespace
self . log . debug ( ' register() ' )
libvirt . virEventRegisterImpl (
2018-11-20 10:34:48 +03:00
self . _add_handle , self . _update_handle , self . _remove_handle ,
2017-03-17 16:35:53 +03:00
self . _add_timeout , self . _update_timeout , self . _remove_timeout )
return self
2017-08-24 22:22:49 +03:00
def schedule_ff_callback ( self , iden , opaque ) :
2017-03-17 16:35:53 +03:00
''' Schedule a ff callback from one of the handles or timers '''
2017-08-24 22:22:49 +03:00
ensure_future ( self . _ff_callback ( iden , opaque ) , loop = self . loop )
@asyncio.coroutine
def _ff_callback ( self , iden , opaque ) :
''' Directly free the opaque object
This is a coroutine .
'''
self . log . debug ( ' ff_callback(iden= %d , opaque=...) ' , iden )
2020-04-20 12:51:48 +03:00
libvirt . virEventInvokeFreeCallback ( opaque )
2017-09-14 03:41:12 +03:00
self . _pending_dec ( )
@asyncio.coroutine
def drain ( self ) :
''' Wait for the implementation to become idle.
This is a coroutine .
'''
self . log . debug ( ' drain() ' )
if self . _pending :
yield from self . _finished . wait ( )
self . log . debug ( ' drain ended ' )
2017-03-17 16:35:53 +03:00
def is_idle ( self ) :
''' Returns False if there are leftovers from a connection
Those may happen if there are sematical problems while closing
a connection . For example , not deregistered events before . close ( ) .
'''
2017-09-14 03:41:12 +03:00
return not self . callbacks and not self . _pending
2017-03-17 16:35:53 +03:00
def _add_handle ( self , fd , event , cb , opaque ) :
''' Register a callback for monitoring file handle events
: param int fd : file descriptor to listen on
: param int event : bitset of events on which to fire the callback
: param cb : the callback to be called when an event occurrs
: param opaque : user data to pass to the callback
: rtype : int
: returns : handle watch number to be used for updating and unregistering for events
. . seealso : :
https : / / libvirt . org / html / libvirt - libvirt - event . html #virEventAddHandleFuncFunc
'''
callback = FDCallback ( self , cb , opaque ,
2018-11-20 10:34:48 +03:00
descriptor = self . descriptors [ fd ] , event = event )
2017-09-14 03:24:29 +03:00
assert callback . iden not in self . callbacks
2017-08-24 22:22:49 +03:00
self . log . debug ( ' add_handle(fd= %d , event= %d , cb=..., opaque=...) = %d ' ,
2018-11-20 10:34:48 +03:00
fd , event , callback . iden )
2017-03-17 16:35:53 +03:00
self . callbacks [ callback . iden ] = callback
self . descriptors [ fd ] . add_handle ( callback )
2017-09-14 03:41:12 +03:00
self . _pending_inc ( )
2017-03-17 16:35:53 +03:00
return callback . iden
def _update_handle ( self , watch , event ) :
''' Change event set for a monitored file handle
: param int watch : file descriptor watch to modify
: param int event : new events to listen on
. . seealso : :
https : / / libvirt . org / html / libvirt - libvirt - event . html #virEventUpdateHandleFunc
'''
self . log . debug ( ' update_handle(watch= %d , event= %d ) ' , watch , event )
2020-04-20 12:58:17 +03:00
callback = self . callbacks [ watch ]
assert isinstance ( callback , FDCallback )
callback . update ( event = event )
2017-03-17 16:35:53 +03:00
def _remove_handle ( self , watch ) :
''' Unregister a callback from a file handle.
: param int watch : file descriptor watch to stop listening on
2020-04-27 10:44:57 +03:00
: returns : - 1 on error , 0 on success
2017-03-17 16:35:53 +03:00
. . seealso : :
https : / / libvirt . org / html / libvirt - libvirt - event . html #virEventRemoveHandleFunc
'''
self . log . debug ( ' remove_handle(watch= %d ) ' , watch )
2017-08-24 22:22:49 +03:00
try :
callback = self . callbacks . pop ( watch )
except KeyError as err :
self . log . warning ( ' remove_handle(): no such handle: %r ' , err . args [ 0 ] )
2020-04-27 10:44:57 +03:00
return - 1
2020-04-20 12:58:17 +03:00
assert isinstance ( callback , FDCallback )
2017-03-17 16:35:53 +03:00
fd = callback . descriptor . fd
assert callback is self . descriptors [ fd ] . remove_handle ( watch )
if len ( self . descriptors [ fd ] . callbacks ) == 0 :
del self . descriptors [ fd ]
callback . close ( )
2020-04-27 10:44:57 +03:00
return 0
2017-03-17 16:35:53 +03:00
def _add_timeout ( self , timeout , cb , opaque ) :
''' Register a callback for a timer event
: param int timeout : the timeout to monitor
: param cb : the callback to call when timeout has expired
: param opaque : user data to pass to the callback
: rtype : int
: returns : a timer value
. . seealso : :
https : / / libvirt . org / html / libvirt - libvirt - event . html #virEventAddTimeoutFunc
'''
callback = TimeoutCallback ( self , cb , opaque )
2017-09-14 03:24:29 +03:00
assert callback . iden not in self . callbacks
2017-08-24 22:22:49 +03:00
self . log . debug ( ' add_timeout(timeout= %d , cb=..., opaque=...) = %d ' ,
2018-11-20 10:34:48 +03:00
timeout , callback . iden )
2017-03-17 16:35:53 +03:00
self . callbacks [ callback . iden ] = callback
callback . update ( timeout = timeout )
2017-09-14 03:41:12 +03:00
self . _pending_inc ( )
2017-03-17 16:35:53 +03:00
return callback . iden
def _update_timeout ( self , timer , timeout ) :
''' Change frequency for a timer
: param int timer : the timer to modify
: param int timeout : the new timeout value in ms
. . seealso : :
https : / / libvirt . org / html / libvirt - libvirt - event . html #virEventUpdateTimeoutFunc
'''
self . log . debug ( ' update_timeout(timer= %d , timeout= %d ) ' , timer , timeout )
2020-04-20 12:58:17 +03:00
callback = self . callbacks [ timer ]
assert isinstance ( callback , TimeoutCallback )
callback . update ( timeout = timeout )
2017-03-17 16:35:53 +03:00
def _remove_timeout ( self , timer ) :
''' Unregister a callback for a timer
: param int timer : the timer to remove
2020-04-27 10:44:57 +03:00
: returns : - 1 on error , 0 on success
2017-03-17 16:35:53 +03:00
. . seealso : :
https : / / libvirt . org / html / libvirt - libvirt - event . html #virEventRemoveTimeoutFunc
'''
self . log . debug ( ' remove_timeout(timer= %d ) ' , timer )
2020-04-27 10:44:57 +03:00
try :
callback = self . callbacks . pop ( timer )
except KeyError as err :
self . log . warning ( ' remove_timeout(): no such timeout: %r ' , err . args [ 0 ] )
return - 1
2017-03-17 16:35:53 +03:00
callback . close ( )
2020-04-27 10:44:57 +03:00
return 0
2017-03-17 16:35:53 +03:00
2017-09-14 03:29:29 +03:00
_current_impl = None
2018-11-20 10:34:48 +03:00
2017-09-14 03:29:29 +03:00
def getCurrentImpl ( ) :
''' Return the current implementation, or None if not yet registered '''
return _current_impl
2018-11-20 10:34:48 +03:00
2017-03-17 16:35:53 +03:00
def virEventRegisterAsyncIOImpl ( loop = None ) :
''' Arrange for libvirt ' s callbacks to be dispatched via asyncio event loop
The implementation object is returned , but in normal usage it can safely be
discarded .
'''
2017-09-14 03:29:29 +03:00
global _current_impl
_current_impl = virEventAsyncIOImpl ( loop = loop ) . register ( )
return _current_impl