1
0
mirror of git://sourceware.org/git/lvm2.git synced 2025-01-04 09:18:36 +03:00
lvm2/test/dbus/testlib.py
Tony Asleson eacff5c189 lvmdbustest: Print messages if timeout value > 10%
We will dump some informational messages if the time to return when we
specify a timeout exceeds 10% of requested.
2016-12-20 11:06:57 -06:00

299 lines
8.0 KiB
Python

#!/usr/bin/env python3
# Copyright (C) 2015-2016 Red Hat, Inc. All rights reserved.
#
# This copyrighted material is made available to anyone wishing to use,
# modify, copy, or redistribute it subject to the terms and conditions
# of the GNU General Public License v.2.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import string
import random
import functools
import xml.etree.ElementTree as Et
from collections import OrderedDict
import dbus
import os
import sys
import time
BUS_NAME = os.getenv('LVM_DBUS_NAME', 'com.redhat.lvmdbus1')
BASE_INTERFACE = 'com.redhat.lvmdbus1'
MANAGER_INT = BASE_INTERFACE + '.Manager'
MANAGER_OBJ = '/' + BASE_INTERFACE.replace('.', '/') + '/Manager'
PV_INT = BASE_INTERFACE + ".Pv"
VG_INT = BASE_INTERFACE + ".Vg"
LV_INT = BASE_INTERFACE + ".Lv"
THINPOOL_INT = BASE_INTERFACE + ".ThinPool"
SNAPSHOT_INT = BASE_INTERFACE + ".Snapshot"
LV_COMMON_INT = BASE_INTERFACE + ".LvCommon"
JOB_INT = BASE_INTERFACE + ".Job"
CACHE_POOL_INT = BASE_INTERFACE + ".CachePool"
CACHE_LV_INT = BASE_INTERFACE + ".CachedLv"
THINPOOL_LV_PATH = '/' + THINPOOL_INT.replace('.', '/')
validate_introspection = True
def rs(length, suffix, character_set=string.ascii_lowercase):
return ''.join(random.choice(character_set) for _ in range(length)) + suffix
def mib(s):
return 1024 * 1024 * s
def std_err_print(*args):
sys.stderr.write(' '.join(map(str, args)) + '\n')
sys.stderr.flush()
class DbusIntrospection(object):
@staticmethod
def introspect(xml_representation):
interfaces = {}
root = Et.fromstring(xml_representation)
for c in root:
if c.tag == "interface":
in_f = c.attrib['name']
interfaces[in_f] = dict(methods=OrderedDict(), properties={})
for nested in c:
if nested.tag == "method":
mn = nested.attrib['name']
interfaces[in_f]['methods'][mn] = OrderedDict()
for arg in nested:
if arg.tag == 'arg':
arg_dir = arg.attrib['direction']
if arg_dir == 'in':
n = arg.attrib['name']
else:
n = 'RETURN_VALUE'
arg_type = arg.attrib['type']
if n:
v = dict(
name=mn,
a_dir=arg_dir,
a_type=arg_type
)
interfaces[in_f]['methods'][mn][n] = v
elif nested.tag == 'property':
pn = nested.attrib['name']
p_access = nested.attrib['access']
p_type = nested.attrib['type']
interfaces[in_f]['properties'][pn] = \
dict(p_access=p_access, p_type=p_type)
else:
pass
# print('Interfaces...')
# for k, v in list(interfaces.items()):
# print('Interface %s' % k)
# if v['methods']:
# for m, args in list(v['methods'].items()):
# print(' method: %s' % m)
# for a, aa in args.items():
# print(' method arg: %s type %s' %
# (a, aa['a_type']))
# if v['properties']:
# for p, d in list(v['properties'].items()):
# print(' Property: %s type= %s' % (p, d['p_type']))
# print('End interfaces')
return interfaces
def btsr(value):
t = type(value)
if t == dbus.Boolean:
return 'b'
elif t == dbus.ObjectPath:
return 'o'
elif t == dbus.String:
return 's'
elif t == dbus.Byte:
return 'y'
elif t == dbus.Int16:
return 'n'
elif t == dbus.Int32:
return 'i'
elif t == dbus.Int64:
return 'x'
elif t == dbus.UInt16:
return 'q'
elif t == dbus.UInt32:
return 'u'
elif t == dbus.UInt64:
return 't'
elif t == dbus.Double:
return 'd'
elif t == dbus.Struct:
rc = '('
for vt in value:
rc += btsr(vt)
rc += ')'
return rc
elif t == dbus.Array:
rc = "a"
for i in value:
rc += btsr(i)
break
return rc
else:
raise RuntimeError("Unhandled type %s" % str(t))
def verify_type(value, dbus_str_rep):
actual_str_rep = btsr(value)
if dbus_str_rep != actual_str_rep:
# print("%s ~= %s" % (dbus_str_rep, actual_str_rep))
# Unless we have a full filled out type we won't match exactly
if not dbus_str_rep.startswith(actual_str_rep):
raise RuntimeError(
"Incorrect type, expected= %s actual = %s object= %s" %
(dbus_str_rep, actual_str_rep, str(type(value))))
class RemoteInterface(object):
def _set_props(self, props=None):
if not props:
for _ in range(0, 3):
try:
prop_interface = dbus.Interface(self.dbus_object,
'org.freedesktop.DBus.Properties')
props = prop_interface.GetAll(self.interface)
break
except dbus.exceptions.DBusException as dbe:
if "GetAll" not in str(dbe):
raise dbe
if props:
for kl, vl in list(props.items()):
# Verify type is correct!
if self.introspect:
verify_type(vl, self.introspect[self.interface]
['properties'][kl]['p_type'])
setattr(self, kl, vl)
@property
def object_path(self):
return self.dbus_object.object_path
def __init__(
self, dbus_object, interface, introspect,
properties=None, timelimit=-1):
self.dbus_object = dbus_object
self.interface = interface
self.introspect = introspect
self.tmo = 0
if timelimit >= 0:
self.tmo = float(timelimit)
self.tmo *= 1.10
self.dbus_interface = dbus.Interface(self.dbus_object, self.interface)
self._set_props(properties)
def __getattr__(self, item):
if hasattr(self.dbus_interface, item):
return functools.partial(self._wrapper, item)
else:
return functools.partial(self, item)
def _wrapper(self, _method_name, *args, **kwargs):
# Lets see how long a method takes to execute, in call cases we should
# return something when the time limit has been reached.
start = time.time()
result = getattr(self.dbus_interface, _method_name)(*args, **kwargs)
end = time.time()
diff = end - start
if self.tmo > 0.0:
if diff > self.tmo:
std_err_print("\n Time exceeded: %f > %f %s" %
(diff, self.tmo, _method_name))
if self.introspect:
if 'RETURN_VALUE' in self.introspect[
self.interface]['methods'][_method_name]:
r_type = self.introspect[
self.interface]['methods'][
_method_name]['RETURN_VALUE']['a_type']
verify_type(result, r_type)
return result
def update(self):
self._set_props()
class ClientProxy(object):
@staticmethod
def _intf_short_name(nm):
return nm.split('.')[-1:][0]
def get_introspect(self):
i = dbus.Interface(
self.dbus_object,
'org.freedesktop.DBus.Introspectable')
return DbusIntrospection.introspect(i.Introspect())
def _common(self, interface, introspect, properties):
short_name = ClientProxy._intf_short_name(interface)
self.short_interface_names.append(short_name)
ro = RemoteInterface(self.dbus_object, interface, introspect,
properties, timelimit=self.tmo)
setattr(self, short_name, ro)
def __init__(self, bus, object_path, interface_prop_hash=None,
interfaces=None, timelimit=-1):
self.object_path = object_path
self.short_interface_names = []
self.tmo = timelimit
self.dbus_object = bus.get_object(
BUS_NAME, self.object_path, introspect=False)
if interface_prop_hash:
assert interfaces is None
if interfaces:
assert interface_prop_hash is None
if interface_prop_hash and not validate_introspection:
# We have everything including the values of the properties
for i, props in interface_prop_hash.items():
self._common(i, None, props)
elif interfaces and not validate_introspection:
# We are retrieving the values of the properties
for i in interfaces:
self._common(i, None, None)
else:
# We need to query the interfaces and gather all the properties
# for each interface, as we have the introspection data we
# will also utilize it to verify what we get back verifies
introspect = self.get_introspect()
if interface_prop_hash:
introspect_interfaces = list(introspect.keys())
for object_manager_key in interface_prop_hash.keys():
assert object_manager_key in introspect_interfaces
for i in list(introspect.keys()):
self._common(i, introspect, None)
def update(self):
# Go through all interfaces and update them
for sn in self.short_interface_names:
getattr(self, sn).update()