1
0
mirror of git://sourceware.org/git/lvm2.git synced 2025-01-03 05:18:29 +03:00

lvmdbustest.py: ws fixes

This commit is contained in:
Tony Asleson 2016-10-11 12:22:31 -05:00
parent 9c56902365
commit e57fd9d963
2 changed files with 101 additions and 90 deletions

View File

@ -9,6 +9,7 @@
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
# noinspection PyUnresolvedReferences
import dbus import dbus
# noinspection PyUnresolvedReferences # noinspection PyUnresolvedReferences
from dbus.mainloop.glib import DBusGMainLoop from dbus.mainloop.glib import DBusGMainLoop
@ -49,9 +50,10 @@ def lv_n(suffix=None):
def get_objects(): def get_objects():
rc = {MANAGER_INT: [], PV_INT: [], VG_INT: [], LV_INT: [], rc = {
THINPOOL_INT: [], JOB_INT: [], SNAPSHOT_INT: [], LV_COMMON_INT: [], MANAGER_INT: [], PV_INT: [], VG_INT: [], LV_INT: [],
CACHE_POOL_INT: [], CACHE_LV_INT: []} THINPOOL_INT: [], JOB_INT: [], SNAPSHOT_INT: [], LV_COMMON_INT: [],
CACHE_POOL_INT: [], CACHE_LV_INT: []}
manager = dbus.Interface(bus.get_object( manager = dbus.Interface(bus.get_object(
BUSNAME, "/com/redhat/lvmdbus1"), BUSNAME, "/com/redhat/lvmdbus1"),
@ -289,7 +291,7 @@ class TestDbusService(unittest.TestCase):
if len(self.objs[PV_INT]) >= 2: if len(self.objs[PV_INT]) >= 2:
vg = self._vg_create( vg = self._vg_create(
[self.objs[PV_INT][0].object_path, [self.objs[PV_INT][0].object_path,
self.objs[PV_INT][1].object_path]).Vg self.objs[PV_INT][1].object_path]).Vg
path = self.handle_return( path = self.handle_return(
vg.Reduce(False, [vg.Pvs[0]], g_tmo, {}) vg.Reduce(False, [vg.Pvs[0]], g_tmo, {})
@ -338,12 +340,14 @@ class TestDbusService(unittest.TestCase):
for l in lv_paths: for l in lv_paths:
lv_proxy = ClientProxy(self.bus, l).LvCommon lv_proxy = ClientProxy(self.bus, l).LvCommon
self.assertTrue(lv_proxy.Vg == vg.object_path, "%s != %s" % self.assertTrue(
(lv_proxy.Vg, vg.object_path)) lv_proxy.Vg == vg.object_path, "%s != %s" %
(lv_proxy.Vg, vg.object_path))
full_name = "%s/%s" % (new_name, lv_proxy.Name) full_name = "%s/%s" % (new_name, lv_proxy.Name)
lv_path = mgr.LookUpByLvmId(full_name) lv_path = mgr.LookUpByLvmId(full_name)
self.assertTrue(lv_path == lv_proxy.object_path, "%s != %s" % self.assertTrue(
(lv_path, lv_proxy.object_path)) lv_path == lv_proxy.object_path, "%s != %s" %
(lv_path, lv_proxy.object_path))
def _verify_hidden_lookups(self, lv_common_object, vgname): def _verify_hidden_lookups(self, lv_common_object, vgname):
mgr = self.objs[MANAGER_INT][0].Manager mgr = self.objs[MANAGER_INT][0].Manager
@ -392,8 +396,9 @@ class TestDbusService(unittest.TestCase):
full_name = "%s/%s" % (vg_name_start, lv_name) full_name = "%s/%s" % (vg_name_start, lv_name)
lookup_lv_path = mgr.LookUpByLvmId(full_name) lookup_lv_path = mgr.LookUpByLvmId(full_name)
self.assertTrue(thin_lv_path == lookup_lv_path, self.assertTrue(
"%s != %s" % (thin_lv_path, lookup_lv_path)) thin_lv_path == lookup_lv_path,
"%s != %s" % (thin_lv_path, lookup_lv_path))
# Rename the VG # Rename the VG
new_name = 'renamed_' + vg.Name new_name = 'renamed_' + vg.Name
@ -411,13 +416,15 @@ class TestDbusService(unittest.TestCase):
for l in lv_paths: for l in lv_paths:
lv_proxy = ClientProxy(self.bus, l).LvCommon lv_proxy = ClientProxy(self.bus, l).LvCommon
self.assertTrue(lv_proxy.Vg == vg.object_path, "%s != %s" % self.assertTrue(
(lv_proxy.Vg, vg.object_path)) lv_proxy.Vg == vg.object_path, "%s != %s" %
(lv_proxy.Vg, vg.object_path))
full_name = "%s/%s" % (new_name, lv_proxy.Name) full_name = "%s/%s" % (new_name, lv_proxy.Name)
# print('Full Name %s' % (full_name)) # print('Full Name %s' % (full_name))
lv_path = mgr.LookUpByLvmId(full_name) lv_path = mgr.LookUpByLvmId(full_name)
self.assertTrue(lv_path == lv_proxy.object_path, "%s != %s" % self.assertTrue(
(lv_path, lv_proxy.object_path)) lv_path == lv_proxy.object_path, "%s != %s" %
(lv_path, lv_proxy.object_path))
# noinspection PyTypeChecker # noinspection PyTypeChecker
self._verify_hidden_lookups(thin_pool.LvCommon, new_name) self._verify_hidden_lookups(thin_pool.LvCommon, new_name)
@ -444,13 +451,13 @@ class TestDbusService(unittest.TestCase):
self._test_lv_create( self._test_lv_create(
vg.LvCreate, vg.LvCreate,
(lv_n(), mib(4), (lv_n(), mib(4),
dbus.Array([], '(ott)'), g_tmo, {}), vg) dbus.Array([], '(ott)'), g_tmo, {}), vg)
def test_lv_create_job(self): def test_lv_create_job(self):
vg = self._vg_create().Vg vg = self._vg_create().Vg
(object_path, job_path) = vg.LvCreate(lv_n(), mib(4), (object_path, job_path) = vg.LvCreate(
dbus.Array([], '(ott)'), 0, {}) lv_n(), mib(4), dbus.Array([], '(ott)'), 0, {})
self.assertTrue(object_path == '/') self.assertTrue(object_path == '/')
self.assertTrue(job_path != '/') self.assertTrue(job_path != '/')
@ -471,9 +478,7 @@ class TestDbusService(unittest.TestCase):
vg = self._vg_create(pv_paths).Vg vg = self._vg_create(pv_paths).Vg
self._test_lv_create( self._test_lv_create(
vg.LvCreateStriped, vg.LvCreateStriped, (lv_n(), mib(4), 2, 8, False, g_tmo, {}), vg)
(lv_n(), mib(4), 2, 8, False,
g_tmo, {}), vg)
def test_lv_create_mirror(self): def test_lv_create_mirror(self):
pv_paths = [] pv_paths = []
@ -481,8 +486,8 @@ class TestDbusService(unittest.TestCase):
pv_paths.append(pp.object_path) pv_paths.append(pp.object_path)
vg = self._vg_create(pv_paths).Vg vg = self._vg_create(pv_paths).Vg
self._test_lv_create(vg.LvCreateMirror, self._test_lv_create(
(lv_n(), mib(4), 2, g_tmo, {}), vg) vg.LvCreateMirror, (lv_n(), mib(4), 2, g_tmo, {}), vg)
def test_lv_create_raid(self): def test_lv_create_raid(self):
pv_paths = [] pv_paths = []
@ -490,9 +495,8 @@ class TestDbusService(unittest.TestCase):
pv_paths.append(pp.object_path) pv_paths.append(pp.object_path)
vg = self._vg_create(pv_paths).Vg vg = self._vg_create(pv_paths).Vg
self._test_lv_create(vg.LvCreateRaid, self._test_lv_create(
(lv_n(), 'raid4', vg.LvCreateRaid, (lv_n(), 'raid4', mib(16), 2, 8, g_tmo, {}), vg)
mib(16), 2, 8, g_tmo, {}), vg)
def _create_lv(self, thinpool=False, size=None, vg=None): def _create_lv(self, thinpool=False, size=None, vg=None):
@ -535,8 +539,9 @@ class TestDbusService(unittest.TestCase):
def test_lv_thinpool_rename(self): def test_lv_thinpool_rename(self):
# Rename a thin pool # Rename a thin pool
tp = self._create_lv(True) tp = self._create_lv(True)
self.assertTrue(THINPOOL_LV_PATH in tp.object_path, self.assertTrue(
"%s" % (tp.object_path)) THINPOOL_LV_PATH in tp.object_path,
"%s" % (tp.object_path))
new_name = 'renamed_' + tp.LvCommon.Name new_name = 'renamed_' + tp.LvCommon.Name
self.handle_return(tp.Lv.Rename(new_name, g_tmo, {})) self.handle_return(tp.Lv.Rename(new_name, g_tmo, {}))
@ -612,11 +617,13 @@ class TestDbusService(unittest.TestCase):
pv = vg.Pvs pv = vg.Pvs
pv_proxy = ClientProxy(self.bus, pv[0]) pvp = ClientProxy(self.bus, pv[0])
self._test_lv_create(vg.LvCreate, (lv_n(), mib(4), self._test_lv_create(
dbus.Array([[pv_proxy.object_path, 0, (pv_proxy.Pv.PeCount - 1)]], vg.LvCreate,
'(ott)'), g_tmo, {}), vg) (lv_n(), mib(4),
dbus.Array([[pvp.object_path, 0, (pvp.Pv.PeCount - 1)]],
'(ott)'), g_tmo, {}), vg)
def test_lv_resize(self): def test_lv_resize(self):
@ -646,8 +653,8 @@ class TestDbusService(unittest.TestCase):
rc = self.handle_return( rc = self.handle_return(
lv.Lv.Resize( lv.Lv.Resize(
size, size,
dbus.Array([[p.object_path, 0, p.Pv.PeCount - 1]], dbus.Array(
'(oii)'), [[p.object_path, 0, p.Pv.PeCount - 1]], '(oii)'),
g_tmo, {})) g_tmo, {}))
else: else:
rc = self.handle_return( rc = self.handle_return(
@ -674,8 +681,8 @@ class TestDbusService(unittest.TestCase):
lv = self._create_lv(vg=vg) lv = self._create_lv(vg=vg)
with self.assertRaises(dbus.exceptions.DBusException): with self.assertRaises(dbus.exceptions.DBusException):
lv.Lv.Resize(lv.LvCommon.SizeBytes, dbus.Array([], '(oii)'), lv.Lv.Resize(
-1, {}) lv.LvCommon.SizeBytes, dbus.Array([], '(oii)'), -1, {})
def test_lv_move(self): def test_lv_move(self):
lv = self._create_lv() lv = self._create_lv()
@ -693,8 +700,8 @@ class TestDbusService(unittest.TestCase):
lv.update() lv.update()
new_pv = str(lv.LvCommon.Devices[0][0]) new_pv = str(lv.LvCommon.Devices[0][0])
self.assertTrue(pv_path_move != new_pv, "%s == %s" % self.assertTrue(
(pv_path_move, new_pv)) pv_path_move != new_pv, "%s == %s" % (pv_path_move, new_pv))
def test_lv_activate_deactivate(self): def test_lv_activate_deactivate(self):
lv_p = self._create_lv() lv_p = self._create_lv()
@ -923,8 +930,9 @@ class TestDbusService(unittest.TestCase):
vg.MaxPvSet(p, g_tmo, {})) vg.MaxPvSet(p, g_tmo, {}))
self.assertEqual(rc, '/') self.assertEqual(rc, '/')
vg.update() vg.update()
self.assertTrue(vg.MaxPv == p, "Expected %s != Actual %s" % self.assertTrue(
(str(p), str(vg.MaxPv))) vg.MaxPv == p,
"Expected %s != Actual %s" % (str(p), str(vg.MaxPv)))
def test_vg_max_lv(self): def test_vg_max_lv(self):
vg = self._vg_create().Vg vg = self._vg_create().Vg
@ -935,8 +943,9 @@ class TestDbusService(unittest.TestCase):
rc = self.handle_return(vg.MaxLvSet(p, g_tmo, {})) rc = self.handle_return(vg.MaxLvSet(p, g_tmo, {}))
self.assertEqual(rc, '/') self.assertEqual(rc, '/')
vg.update() vg.update()
self.assertTrue(vg.MaxLv == p, "Expected %s != Actual %s" % self.assertTrue(
(str(p), str(vg.MaxLv))) vg.MaxLv == p,
"Expected %s != Actual %s" % (str(p), str(vg.MaxLv)))
def test_vg_uuid_gen(self): def test_vg_uuid_gen(self):
# TODO renable test case when # TODO renable test case when
@ -950,8 +959,9 @@ class TestDbusService(unittest.TestCase):
rc = self.handle_return(vg.UuidGenerate(g_tmo, {})) rc = self.handle_return(vg.UuidGenerate(g_tmo, {}))
self.assertEqual(rc, '/') self.assertEqual(rc, '/')
vg.update() vg.update()
self.assertTrue(vg.Uuid != prev_uuid, "Expected %s != Actual %s" % self.assertTrue(
(vg.Uuid, prev_uuid)) vg.Uuid != prev_uuid,
"Expected %s != Actual %s" % (vg.Uuid, prev_uuid))
def test_vg_activate_deactivate(self): def test_vg_activate_deactivate(self):
vg = self._vg_create().Vg vg = self._vg_create().Vg
@ -1032,8 +1042,9 @@ class TestDbusService(unittest.TestCase):
self.assertEqual( self.assertEqual(
self.handle_return( self.handle_return(
mgr.PvScan(False, True, dbus.Array([], 's'), mgr.PvScan(
dbus.Array([], '(ii)'), g_tmo, {})), '/') False, True, dbus.Array([], 's'),
dbus.Array([], '(ii)'), g_tmo, {})), '/')
self.assertEqual(self._refresh(), 0) self.assertEqual(self._refresh(), 0)
self.assertEqual( self.assertEqual(
@ -1063,13 +1074,8 @@ class TestDbusService(unittest.TestCase):
mm.append((int(d.properties['MAJOR']), int(d.properties['MINOR']))) mm.append((int(d.properties['MAJOR']), int(d.properties['MINOR'])))
self.assertEqual( self.assertEqual(
self.handle_return( self.handle_return(
mgr.PvScan mgr.PvScan(False, True, block_path, mm, g_tmo, {})), '/')
(False, True,
block_path,
mm, g_tmo, {})
), '/')
self.assertEqual(self._refresh(), 0) self.assertEqual(self._refresh(), 0)
@ -1150,8 +1156,8 @@ class TestDbusService(unittest.TestCase):
vg, cache_pool = self._create_cache_pool() vg, cache_pool = self._create_cache_pool()
self.assertTrue('/com/redhat/lvmdbus1/CachePool' in self.assertTrue(
cache_pool.object_path) '/com/redhat/lvmdbus1/CachePool' in cache_pool.object_path)
def test_cache_lv_create(self): def test_cache_lv_create(self):
@ -1167,11 +1173,10 @@ class TestDbusService(unittest.TestCase):
cached_lv = ClientProxy(self.bus, c_lv_path) cached_lv = ClientProxy(self.bus, c_lv_path)
uncached_lv_path = self.handle_return( uncached_lv_path = self.handle_return(
cached_lv.CachedLv.DetachCachePool( cached_lv.CachedLv.DetachCachePool(destroy_cache, g_tmo, {}))
destroy_cache, g_tmo, {}))
self.assertTrue('/com/redhat/lvmdbus1/Lv' in self.assertTrue(
uncached_lv_path) '/com/redhat/lvmdbus1/Lv' in uncached_lv_path)
rc = self.handle_return(vg.Remove(g_tmo, {})) rc = self.handle_return(vg.Remove(g_tmo, {}))
self.assertTrue(rc == '/') self.assertTrue(rc == '/')
@ -1236,19 +1241,21 @@ class TestDbusService(unittest.TestCase):
lv_n() + c, lv_n() + c,
mib(4), False, g_tmo, {})) mib(4), False, g_tmo, {}))
for r in ("_cdata", "_cmeta", "_corig", "_mimage", "_mlog", for reserved in (
"_pmspare", "_rimage", "_rmeta", "_tdata", "_tmeta", "_vorigin"): "_cdata", "_cmeta", "_corig", "_mimage", "_mlog",
"_pmspare", "_rimage", "_rmeta", "_tdata", "_tmeta",
"_vorigin"):
with self.assertRaises(dbus.exceptions.DBusException): with self.assertRaises(dbus.exceptions.DBusException):
self.handle_return( self.handle_return(
vg_proxy.Vg.LvCreateLinear( vg_proxy.Vg.LvCreateLinear(
lv_n() + r, lv_n() + reserved,
mib(4), False, g_tmo, {})) mib(4), False, g_tmo, {}))
for r in ("snapshot", "pvmove"): for reserved in ("snapshot", "pvmove"):
with self.assertRaises(dbus.exceptions.DBusException): with self.assertRaises(dbus.exceptions.DBusException):
self.handle_return( self.handle_return(
vg_proxy.Vg.LvCreateLinear( vg_proxy.Vg.LvCreateLinear(
r + lv_n(), reserved + lv_n(),
mib(4), False, g_tmo, {})) mib(4), False, g_tmo, {}))
_ALLOWABLE_TAG_CH = string.ascii_letters + string.digits + "._-+/=!:&#" _ALLOWABLE_TAG_CH = string.ascii_letters + string.digits + "._-+/=!:&#"
@ -1288,16 +1295,18 @@ class TestDbusService(unittest.TestCase):
for i in range(1, 64): for i in range(1, 64):
tag = rs(i, "", self._ALLOWABLE_TAG_CH) tag = rs(i, "", self._ALLOWABLE_TAG_CH)
r = self.handle_return( tmp = self.handle_return(
vg_proxy.Vg.TagsAdd([tag], g_tmo, {})) vg_proxy.Vg.TagsAdd([tag], g_tmo, {}))
self.assertTrue(r == '/') self.assertTrue(tmp == '/')
vg_proxy.update() vg_proxy.update()
self.assertTrue(tag in vg_proxy.Vg.Tags, "%s not in %s" % self.assertTrue(
(tag, str(vg_proxy.Vg.Tags))) tag in vg_proxy.Vg.Tags,
"%s not in %s" % (tag, str(vg_proxy.Vg.Tags)))
self.assertEqual(i, len(vg_proxy.Vg.Tags), "%d != %d" % self.assertEqual(
(i, len(vg_proxy.Vg.Tags))) i, len(vg_proxy.Vg.Tags),
"%d != %d" % (i, len(vg_proxy.Vg.Tags)))
def test_tag_regression(self): def test_tag_regression(self):
mgr = self.objs[MANAGER_INT][0].Manager mgr = self.objs[MANAGER_INT][0].Manager
@ -1309,14 +1318,15 @@ class TestDbusService(unittest.TestCase):
tag = '--h/K.6g0A4FOEatf3+k_nI/Yp&L_u2oy-=j649x:+dUcYWPEo6.IWT0c' tag = '--h/K.6g0A4FOEatf3+k_nI/Yp&L_u2oy-=j649x:+dUcYWPEo6.IWT0c'
r = self.handle_return( tmp = self.handle_return(
vg_proxy.Vg.TagsAdd([tag], g_tmo, {}) vg_proxy.Vg.TagsAdd([tag], g_tmo, {})
) )
self.assertTrue(r == '/') self.assertTrue(tmp == '/')
vg_proxy.update() vg_proxy.update()
self.assertTrue(tag in vg_proxy.Vg.Tags, "%s not in %s" % self.assertTrue(
(tag, str(vg_proxy.Vg.Tags))) tag in vg_proxy.Vg.Tags,
"%s not in %s" % (tag, str(vg_proxy.Vg.Tags)))
class AggregateResults(object): class AggregateResults(object):
@ -1364,7 +1374,8 @@ if __name__ == '__main__':
r.register_result(unittest.main(exit=False)) r.register_result(unittest.main(exit=False))
else: else:
r.register_fail() r.register_fail()
std_err_print("ERROR: Unable to dynamically configure " std_err_print(
"service to use lvm shell!") "ERROR: Unable to dynamically configure service to use "
"lvm shell!")
r.exit_run() r.exit_run()

View File

@ -32,8 +32,7 @@ THINPOOL_LV_PATH = '/' + THINPOOL_INT.replace('.', '/')
def rs(length, suffix, character_set=string.ascii_lowercase): def rs(length, suffix, character_set=string.ascii_lowercase):
return ''.join(random.choice(character_set) return ''.join(random.choice(character_set) for _ in range(length)) + suffix
for _ in range(length)) + suffix
def mib(s): def mib(s):
@ -50,8 +49,7 @@ class DbusIntrospection(object):
for c in root: for c in root:
if c.tag == "interface": if c.tag == "interface":
in_f = c.attrib['name'] in_f = c.attrib['name']
interfaces[in_f] = \ interfaces[in_f] = dict(methods=OrderedDict(), properties={})
dict(methods=OrderedDict(), properties={})
for nested in c: for nested in c:
if nested.tag == "method": if nested.tag == "method":
mn = nested.attrib['name'] mn = nested.attrib['name']
@ -71,7 +69,8 @@ class DbusIntrospection(object):
v = dict( v = dict(
name=mn, name=mn,
a_dir=arg_dir, a_dir=arg_dir,
a_type=arg_type) a_type=arg_type
)
interfaces[in_f]['methods'][mn][n] = v interfaces[in_f]['methods'][mn][n] = v
elif nested.tag == 'property': elif nested.tag == 'property':
@ -148,10 +147,9 @@ def verify_type(value, dbus_str_rep):
# print("%s ~= %s" % (dbus_str_rep, actual_str_rep)) # print("%s ~= %s" % (dbus_str_rep, actual_str_rep))
# Unless we have a full filled out type we won't match exactly # Unless we have a full filled out type we won't match exactly
if not dbus_str_rep.startswith(actual_str_rep): if not dbus_str_rep.startswith(actual_str_rep):
raise RuntimeError("Incorrect type, expected= %s actual " raise RuntimeError(
"= %s object= %s" % "Incorrect type, expected= %s actual = %s object= %s" %
(dbus_str_rep, actual_str_rep, (dbus_str_rep, actual_str_rep, str(type(value))))
str(type(value))))
class RemoteObject(object): class RemoteObject(object):
@ -174,12 +172,14 @@ class RemoteObject(object):
if props: if props:
for kl, vl in list(props.items()): for kl, vl in list(props.items()):
# Verify type is correct! # Verify type is correct!
verify_type(vl, verify_type(
vl,
self.introspect[self.interface]['properties'][kl]['p_type']) self.introspect[self.interface]['properties'][kl]['p_type'])
setattr(self, kl, vl) setattr(self, kl, vl)
def __init__(self, specified_bus, object_path, interface, introspect, def __init__(
properties=None): self, specified_bus, object_path, interface, introspect,
properties=None):
self.object_path = object_path self.object_path = object_path
self.interface = interface self.interface = interface
self.bus = specified_bus self.bus = specified_bus
@ -238,11 +238,11 @@ class ClientProxy(object):
# print('Client proxy has interface: %s %s' % (k, sn)) # print('Client proxy has interface: %s %s' % (k, sn))
if interface and interface == k and props is not None: if interface and interface == k and props is not None:
ro = RemoteObject(specified_bus, object_path, k, ro = RemoteObject(
self.intro_spect, props) specified_bus, object_path, k, self.intro_spect, props)
else: else:
ro = RemoteObject(specified_bus, object_path, k, ro = RemoteObject(
self.intro_spect) specified_bus, object_path, k, self.intro_spect)
setattr(self, sn, ro) setattr(self, sn, ro)