#!/usr/bin/env python3 # Copyright (C) 2015-2016 Red Hat, Inc. All rights reserved. # # This copyrighted material is made available to anyone wishing to use, # modify, copy, or redistribute it subject to the terms and conditions # of the GNU General Public License v.2. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import dbus # noinspection PyUnresolvedReferences from dbus.mainloop.glib import DBusGMainLoop import unittest import sys import random import string import functools import time import pyudev import os import xml.etree.ElementTree as Et from collections import OrderedDict BUSNAME = "com.redhat.lvmdbus1" MANAGER_INT = BUSNAME + '.Manager' MANAGER_OBJ = '/' + BUSNAME.replace('.', '/') + 'Manager' PV_INT = BUSNAME + ".Pv" VG_INT = BUSNAME + ".Vg" LV_INT = BUSNAME + ".Lv" THINPOOL_INT = BUSNAME + ".ThinPool" SNAPSHOT_INT = BUSNAME + ".Snapshot" LV_COMMON_INT = BUSNAME + ".LvCommon" JOB_INT = BUSNAME + ".Job" CACHE_POOL_INT = BUSNAME + ".CachePool" CACHE_LV_INT = BUSNAME + ".CachedLv" THINPOOL_LV_PATH = '/' + THINPOOL_INT.replace('.', '/') def rs(length, suffix, character_set=string.ascii_lowercase): return ''.join(random.choice(character_set) for _ in range(length)) + suffix use_session = os.getenv('LVMDBUSD_USE_SESSION', False) if use_session: bus = dbus.SessionBus(mainloop=DBusGMainLoop()) else: bus = dbus.SystemBus(mainloop=DBusGMainLoop()) def mib(s): return 1024 * 1024 * s class DbusIntrospection(object): @staticmethod def introspect(xml_representation): interfaces = {} root = Et.fromstring(xml_representation) for c in root: if c.tag == "interface": in_f = c.attrib['name'] interfaces[in_f] = \ dict(methods=OrderedDict(), properties={}) for nested in c: if nested.tag == "method": mn = nested.attrib['name'] interfaces[in_f]['methods'][mn] = OrderedDict() for arg in nested: if arg.tag == 'arg': arg_dir = arg.attrib['direction'] if arg_dir == 'in': n = arg.attrib['name'] else: n = None arg_type = arg.attrib['type'] if n: v = dict( name=mn, a_dir=arg_dir, a_type=arg_type) interfaces[in_f]['methods'][mn][n] = v elif nested.tag == 'property': pn = nested.attrib['name'] p_access = nested.attrib['access'] p_type = nested.attrib['type'] interfaces[in_f]['properties'][pn] = \ dict(p_access=p_access, p_type=p_type) else: pass # print('Interfaces...') # for k, v in list(interfaces.items()): # print('Interface %s' % k) # if v['methods']: # for m, args in list(v['methods'].items()): # print(' method: %s' % m) # for a, aa in args.items(): # print(' method arg: %s' % (a)) # if v['properties']: # for p, d in list(v['properties'].items()): # print(' Property: %s' % (p)) # print('End interfaces') return interfaces class RemoteObject(object): def _set_props(self, props=None): # print 'Fetching properties' if not props: # prop_fetch = dbus.Interface(self.bus.get_object( # BUSNAME, self.object_path), 'org.freedesktop.DBus.Properties') for i in range(0, 3): try: prop_fetch = dbus.Interface(self.bus.get_object( BUSNAME, self.object_path), 'org.freedesktop.DBus.Properties') props = prop_fetch.GetAll(self.interface) break except dbus.exceptions.DBusException as dbe: if "GetAll" not in str(dbe): raise dbe if props: for kl, vl in list(props.items()): setattr(self, kl, vl) def __init__(self, specified_bus, object_path, interface, properties=None): self.object_path = object_path self.interface = interface self.bus = specified_bus self.dbus_method = dbus.Interface(specified_bus.get_object( BUSNAME, self.object_path), self.interface) self._set_props(properties) def __getattr__(self, item): if hasattr(self.dbus_method, item): return functools.partial(self._wrapper, item) else: return functools.partial(self, item) def _wrapper(self, _method_name, *args, **kwargs): return getattr(self.dbus_method, _method_name)(*args, **kwargs) def update(self): self._set_props() class ClientProxy(object): @staticmethod def _intf_short_name(nm): return nm.split('.')[-1:][0] def __init__(self, specified_bus, object_path, interface=None, props=None): i = dbus.Interface(specified_bus.get_object( BUSNAME, object_path), 'org.freedesktop.DBus.Introspectable') self.intro_spect = DbusIntrospection.introspect(i.Introspect()) for k in self.intro_spect.keys(): sn = ClientProxy._intf_short_name(k) # print('Client proxy has interface: %s %s' % (k, sn)) if interface and interface == k and props is not None: ro = RemoteObject(specified_bus, object_path, k, props) else: ro = RemoteObject(specified_bus, object_path, k) setattr(self, sn, ro) self.object_path = object_path def update(self): # Go through all interfaces and update them for int_f in self.intro_spect.keys(): sn = ClientProxy._intf_short_name(int_f) getattr(self, sn).update() def get_objects(): rc = {MANAGER_INT: [], PV_INT: [], VG_INT: [], LV_INT: [], THINPOOL_INT: [], JOB_INT: [], SNAPSHOT_INT: [], LV_COMMON_INT: [], CACHE_POOL_INT: [], CACHE_LV_INT: []} manager = dbus.Interface(bus.get_object( BUSNAME, "/com/redhat/lvmdbus1"), "org.freedesktop.DBus.ObjectManager") objects = manager.GetManagedObjects() for object_path, val in list(objects.items()): for interface, props in list(val.items()): o = ClientProxy(bus, object_path, interface, props) rc[interface].append(o) return rc, bus def set_execution(lvmshell): lvm_manager = dbus.Interface(bus.get_object( BUSNAME, "/com/redhat/lvmdbus1/Manager"), "com.redhat.lvmdbus1.Manager") lvm_manager.UseLvmShell(lvmshell) # noinspection PyUnresolvedReferences class TestDbusService(unittest.TestCase): def setUp(self): # Because of the sensitive nature of running LVM tests we will only # run if we have PVs and nothing else, so that we can be confident that # we are not mucking with someones data on their system self.objs, self.bus = get_objects() if len(self.objs[PV_INT]) == 0: print('No PVs present exiting!') sys.exit(1) if len(self.objs[MANAGER_INT]) != 1: print('Expecting a manager object!') sys.exit(1) if len(self.objs[VG_INT]) != 0: print('Expecting no VGs to exist!') sys.exit(1) self.pvs = [] for p in self.objs[PV_INT]: self.pvs.append(p.Pv.Name) def tearDown(self): # If we get here it means we passed setUp, so lets remove anything # and everything that remains, besides the PVs themselves self.objs, self.bus = get_objects() for v in self.objs[VG_INT]: # print "DEBUG: Removing VG= ", v.Uuid, v.Name v.Vg.Remove(-1, {}) # Check to make sure the PVs we had to start exist, else re-create # them if len(self.pvs) != len(self.objs[PV_INT]): for p in self.pvs: found = False for pc in self.objs[PV_INT]: if pc.Pv.Name == p: found = True break if not found: # print('Re-creating PV=', p) self._pv_create(p) def _pv_create(self, device): pv_path = self.objs[MANAGER_INT][0].Manager.PvCreate(device, -1, {})[0] self.assertTrue(pv_path is not None and len(pv_path) > 0) return pv_path def _manager(self): return self.objs[MANAGER_INT][0] def _refresh(self): return self._manager().Manager.Refresh() def test_refresh(self): rc = self._refresh() self.assertEqual(rc, 0) def test_version(self): rc = self.objs[MANAGER_INT][0].Manager.Version self.assertTrue(rc is not None and len(rc) > 0) self.assertEqual(self._refresh(), 0) def _vg_create(self, pv_paths=None): if not pv_paths: pv_paths = [self.objs[PV_INT][0].object_path] vg_name = rs(8, '_vg') vg_path = self.objs[MANAGER_INT][0].Manager.VgCreate( vg_name, pv_paths, -1, {})[0] self.assertTrue(vg_path is not None and len(vg_path) > 0) return ClientProxy(self.bus, vg_path) def test_vg_create(self): self._vg_create() self.assertEqual(self._refresh(), 0) def test_vg_delete(self): vg = self._vg_create().Vg vg.Remove(-1, {}) self.assertEqual(self._refresh(), 0) def _pv_remove(self, pv): rc = pv.Pv.Remove(-1, {}) return rc def test_pv_remove_add(self): target = self.objs[PV_INT][0] # Remove the PV rc = self._pv_remove(target) self.assertTrue(rc == '/') self.assertEqual(self._refresh(), 0) # Add it back rc = self._pv_create(target.Pv.Name)[0] self.assertTrue(rc == '/') self.assertEqual(self._refresh(), 0) def _lookup(self, lvm_id): return self.objs[MANAGER_INT][0].Manager.LookUpByLvmId(lvm_id) def test_lookup_by_lvm_id(self): # For the moment lets just lookup what we know about which is PVs # When we start testing VGs and LVs we will test lookups for those # during those unit tests for p in self.objs[PV_INT]: rc = self._lookup(p.Pv.Name) self.assertTrue(rc is not None and rc != '/') # Search for something which doesn't exist rc = self._lookup('/dev/null') self.assertTrue(rc == '/') def test_vg_extend(self): # Create a VG self.assertTrue(len(self.objs[PV_INT]) >= 2) if len(self.objs[PV_INT]) >= 2: pv_initial = self.objs[PV_INT][0] pv_next = self.objs[PV_INT][1] vg = self._vg_create([pv_initial.object_path]).Vg path = vg.Extend([pv_next.object_path], -1, {}) self.assertTrue(path == '/') self.assertEqual(self._refresh(), 0) # noinspection PyUnresolvedReferences def test_vg_reduce(self): self.assertTrue(len(self.objs[PV_INT]) >= 2) if len(self.objs[PV_INT]) >= 2: vg = self._vg_create( [self.objs[PV_INT][0].object_path, self.objs[PV_INT][1].object_path]).Vg path = vg.Reduce(False, [vg.Pvs[0]], -1, {}) self.assertTrue(path == '/') self.assertEqual(self._refresh(), 0) # noinspection PyUnresolvedReferences def test_vg_rename(self): vg = self._vg_create().Vg mgr = self.objs[MANAGER_INT][0].Manager # Do a vg lookup path = self.objs[MANAGER_INT][0].Manager.LookUpByLvmId(vg.Name) vg_name_start = vg.Name prev_path = path self.assertTrue(path != '/', "%s" % (path)) # Create some LVs in the VG for i in range(0, 5): lv_t = self._create_lv(size=mib(4), vg=vg) full_name = "%s/%s" % (vg_name_start, lv_t.LvCommon.Name) lv_path = mgr.LookUpByLvmId(full_name) self.assertTrue(lv_path == lv_t.object_path) new_name = 'renamed_' + vg.Name path = vg.Rename(new_name, -1, {}) self.assertTrue(path == '/') self.assertEqual(self._refresh(), 0) # Do a vg lookup path = mgr.LookUpByLvmId(new_name) self.assertTrue(path != '/', "%s" % (path)) self.assertTrue(prev_path == path, "%s != %s" % (prev_path, path)) # Go through each LV and make sure it has the correct path back to the # VG vg.update() lv_paths = vg.Lvs self.assertTrue(len(lv_paths) == 5) for l in lv_paths: lv_proxy = ClientProxy(self.bus, l).LvCommon self.assertTrue(lv_proxy.Vg == vg.object_path, "%s != %s" % (lv_proxy.Vg, vg.object_path)) full_name = "%s/%s" % (new_name, lv_proxy.Name) lv_path = mgr.LookUpByLvmId(full_name) self.assertTrue(lv_path == lv_proxy.object_path, "%s != %s" % (lv_path, lv_proxy.object_path)) def _verify_hidden_lookups(self, lv_common_object, vgname): mgr = self.objs[MANAGER_INT][0].Manager hidden_lv_paths = lv_common_object.HiddenLvs for h in hidden_lv_paths: h_lv = ClientProxy(self.bus, h).LvCommon if len(h_lv.HiddenLvs) > 0: self._verify_hidden_lookups(h_lv, vgname) # print("Hidden check %s %s" % (h, h_lv.Name)) full_name = "%s/%s" % (vgname, h_lv.Name) lookup_path = mgr.LookUpByLvmId(full_name) self.assertTrue(lookup_path != '/') self.assertTrue(lookup_path == h_lv.object_path) def test_vg_rename_with_thin_pool(self): pv_paths = [] for pp in self.objs[PV_INT]: pv_paths.append(pp.object_path) vg = self._vg_create(pv_paths).Vg vg_name_start = vg.Name mgr = self.objs[MANAGER_INT][0].Manager # Let's create a thin pool which uses a raid 5 meta and raid5 data # areas lv_meta_path = vg.LvCreateRaid( "meta_r5", "raid5", mib(4), 0, 0, -1, {})[0] lv_data_path = vg.LvCreateRaid( "data_r5", "raid5", mib(16), 0, 0, -1, {})[0] thin_pool_path = vg.CreateThinPool( lv_meta_path, lv_data_path, -1, {})[0] # Lets create some thin LVs thin_pool = ClientProxy(self.bus, thin_pool_path) # noinspection PyTypeChecker self._verify_hidden_lookups(thin_pool.LvCommon, vg_name_start) for i in range(0, 5): lv_name = rs(8, '_lv') thin_lv_path = thin_pool.ThinPool.LvCreate( lv_name, mib(16), -1, {})[0] self.assertTrue(thin_lv_path != '/') full_name = "%s/%s" % (vg_name_start, lv_name) lookup_lv_path = mgr.LookUpByLvmId(full_name) self.assertTrue(thin_lv_path == lookup_lv_path, "%s != %s" % (thin_lv_path, lookup_lv_path)) # Rename the VG new_name = 'renamed_' + vg.Name path = vg.Rename(new_name, -1, {}) self.assertTrue(path == '/') self.assertEqual(self._refresh(), 0) # Go through each LV and make sure it has the correct path back to the # VG vg.update() thin_pool.update() lv_paths = vg.Lvs for l in lv_paths: lv_proxy = ClientProxy(self.bus, l).LvCommon self.assertTrue(lv_proxy.Vg == vg.object_path, "%s != %s" % (lv_proxy.Vg, vg.object_path)) full_name = "%s/%s" % (new_name, lv_proxy.Name) # print('Full Name %s' % (full_name)) lv_path = mgr.LookUpByLvmId(full_name) self.assertTrue(lv_path == lv_proxy.object_path, "%s != %s" % (lv_path, lv_proxy.object_path)) # noinspection PyTypeChecker self._verify_hidden_lookups(thin_pool.LvCommon, new_name) def _test_lv_create(self, method, params, vg): lv = None path = method(*params)[0] self.assertTrue(vg) if path: lv = ClientProxy(self.bus, path) # TODO verify object properties self.assertEqual(self._refresh(), 0) return lv def test_lv_create(self): vg = self._vg_create().Vg self._test_lv_create( vg.LvCreate, (rs(8, '_lv'), mib(4), dbus.Array([], '(ott)'), -1, {}), vg) def test_lv_create_linear(self): vg = self._vg_create().Vg self._test_lv_create( vg.LvCreateLinear, (rs(8, '_lv'), mib(4), False, -1, {}), vg) def test_lv_create_striped(self): pv_paths = [] for pp in self.objs[PV_INT]: pv_paths.append(pp.object_path) vg = self._vg_create(pv_paths).Vg self._test_lv_create( vg.LvCreateStriped, (rs(8, '_lv'), mib(4), 2, 8, False, -1, {}), vg) def test_lv_create_mirror(self): pv_paths = [] for pp in self.objs[PV_INT]: pv_paths.append(pp.object_path) vg = self._vg_create(pv_paths).Vg self._test_lv_create(vg.LvCreateMirror, (rs(8, '_lv'), mib(4), 2, -1, {}), vg) def test_lv_create_raid(self): pv_paths = [] for pp in self.objs[PV_INT]: pv_paths.append(pp.object_path) vg = self._vg_create(pv_paths).Vg self._test_lv_create(vg.LvCreateRaid, (rs(8, '_lv'), 'raid4', mib(16), 2, 8, -1, {}), vg) def _create_lv(self, thinpool=False, size=None, vg=None): if not vg: pv_paths = [] for pp in self.objs[PV_INT]: pv_paths.append(pp.object_path) vg = self._vg_create(pv_paths).Vg if size is None: size = mib(4) return self._test_lv_create( vg.LvCreateLinear, (rs(8, '_lv'), size, thinpool, -1, {}), vg) def test_lv_create_rounding(self): self._create_lv(size=(mib(2) + 13)) def test_lv_create_thin_pool(self): self._create_lv(True) def test_lv_rename(self): # Rename a regular LV lv = self._create_lv() path = self.objs[MANAGER_INT][0].Manager.LookUpByLvmId(lv.LvCommon.Name) prev_path = path new_name = 'renamed_' + lv.LvCommon.Name lv.Lv.Rename(new_name, -1, {}) path = self.objs[MANAGER_INT][0].Manager.LookUpByLvmId(new_name) self.assertEqual(self._refresh(), 0) self.assertTrue(prev_path == path, "%s != %s" % (prev_path, path)) def test_lv_thinpool_rename(self): # Rename a thin pool tp = self._create_lv(True) self.assertTrue(THINPOOL_LV_PATH in tp.object_path, "%s" % (tp.object_path)) new_name = 'renamed_' + tp.LvCommon.Name tp.Lv.Rename(new_name, -1, {}) tp.update() self.assertEqual(self._refresh(), 0) self.assertEqual(new_name, tp.LvCommon.Name) # noinspection PyUnresolvedReferences def test_lv_on_thin_pool_rename(self): # Rename a LV on a thin Pool # This returns a LV with the LV interface, need to get a proxy for # thinpool interface too tp = self._create_lv(True) thin_path = tp.ThinPool.LvCreate( rs(10, '_thin_lv'), mib(8), -1, {})[0] lv = ClientProxy(self.bus, thin_path) rc = lv.Lv.Rename('rename_test' + lv.LvCommon.Name, -1, {}) self.assertTrue(rc == '/') self.assertEqual(self._refresh(), 0) def test_lv_remove(self): lv = self._create_lv().Lv rc = lv.Remove(-1, {}) self.assertTrue(rc == '/') self.assertEqual(self._refresh(), 0) def test_lv_snapshot(self): lv_p = self._create_lv() ss_name = 'ss_' + lv_p.LvCommon.Name # Test waiting to complete ss, job = lv_p.Lv.Snapshot(ss_name, 0, -1, {}) self.assertTrue(ss != '/') self.assertTrue(job == '/') snapshot = ClientProxy(self.bus, ss) self.assertTrue(snapshot.LvCommon.Name == ss_name) self.assertEqual(self._refresh(), 0) # Test getting a job returned immediately rc, job = lv_p.Lv.Snapshot('ss2_' + lv_p.LvCommon.Name, 0, 0, {}) self.assertTrue(rc == '/') self.assertTrue(job != '/') self._wait_for_job(job) self.assertEqual(self._refresh(), 0) # noinspection PyUnresolvedReferences def _wait_for_job(self, j_path): import time rc = None j = ClientProxy(self.bus, j_path).Job while True: j.update() if j.Complete: (ec, error_msg) = j.GetError self.assertTrue(ec == 0, "%d :%s" % (ec, error_msg)) if ec == 0: self.assertTrue(j.Percent == 100, "P= %f" % j.Percent) rc = j.Result j.Remove() break if j.Wait(1): j.update() self.assertTrue(j.Complete) return rc def test_lv_create_pv_specific(self): vg = self._vg_create().Vg pv = vg.Pvs pv_proxy = ClientProxy(self.bus, pv[0]) self._test_lv_create(vg.LvCreate, (rs(8, '_lv'), mib(4), dbus.Array([[pv_proxy.object_path, 0, (pv_proxy.Pv.PeCount - 1)]], '(ott)'), -1, {}), vg) def test_lv_resize(self): pv_paths = [] for pp in self.objs[PV_INT]: pv_paths.append(pp.object_path) vg = self._vg_create(pv_paths).Vg lv = self._create_lv(vg=vg, size=mib(16)) for size in [ lv.LvCommon.SizeBytes + 4194304, lv.LvCommon.SizeBytes - 4194304, lv.LvCommon.SizeBytes + 2048, lv.LvCommon.SizeBytes - 2048]: pv_in_use = [i[0] for i in lv.LvCommon.Devices] # Select a PV in the VG that isn't in use pv_empty = [p for p in vg.Pvs if p not in pv_in_use] prev = lv.LvCommon.SizeBytes if len(pv_empty): p = ClientProxy(self.bus, pv_empty[0]) rc = lv.Lv.Resize( size, dbus.Array([[p.object_path, 0, p.Pv.PeCount - 1]], '(oii)'), -1, {}) else: rc = lv.Lv.Resize(size, dbus.Array([], '(oii)'), -1, {}) self.assertEqual(rc, '/') self.assertEqual(self._refresh(), 0) lv.update() if prev < size: self.assertTrue(lv.LvCommon.SizeBytes > prev) else: # We are testing re-sizing to same size too... self.assertTrue(lv.LvCommon.SizeBytes <= prev) def test_lv_resize_same(self): pv_paths = [] for pp in self.objs[PV_INT]: pv_paths.append(pp.object_path) vg = self._vg_create(pv_paths).Vg lv = self._create_lv(vg=vg) with self.assertRaises(dbus.exceptions.DBusException): lv.Lv.Resize(lv.LvCommon.SizeBytes, dbus.Array([], '(oii)'), -1, {}) def test_lv_move(self): lv = self._create_lv() pv_path_move = str(lv.LvCommon.Devices[0][0]) # Test moving a specific LV job = lv.Lv.Move(pv_path_move, (0, 0), dbus.Array([], '(oii)'), 0, {}) self._wait_for_job(job) self.assertEqual(self._refresh(), 0) lv.update() new_pv = str(lv.LvCommon.Devices[0][0]) self.assertTrue(pv_path_move != new_pv, "%s == %s" % (pv_path_move, new_pv)) def test_lv_activate_deactivate(self): lv_p = self._create_lv() lv_p.update() lv_p.Lv.Deactivate(0, -1, {}) lv_p.update() self.assertFalse(lv_p.LvCommon.Active) self.assertEqual(self._refresh(), 0) lv_p.Lv.Activate(0, -1, {}) lv_p.update() self.assertTrue(lv_p.LvCommon.Active) self.assertEqual(self._refresh(), 0) # Try control flags for i in range(0, 5): lv_p.Lv.Activate(1 << i, -1, {}) self.assertTrue(lv_p.LvCommon.Active) self.assertEqual(self._refresh(), 0) def test_move(self): lv = self._create_lv() # Test moving without being LV specific vg = ClientProxy(self.bus, lv.LvCommon.Vg).Vg pv_to_move = str(lv.LvCommon.Devices[0][0]) job = vg.Move(pv_to_move, (0, 0), dbus.Array([], '(oii)'), 0, {}) self._wait_for_job(job) self.assertEqual(self._refresh(), 0) # Test Vg.Move # TODO Test this more! vg.update() lv.update() location = lv.LvCommon.Devices[0][0] dst = None for p in vg.Pvs: if p != location: dst = p # Fetch the destination pv = ClientProxy(self.bus, dst).Pv # Test range, move it to the middle of the new destination and blocking # blocking for it to complete job = vg.Move( location, (0, 0), [(dst, pv.PeCount / 2, 0), ], -1, {}) self.assertEqual(job, '/') self.assertEqual(self._refresh(), 0) def test_job_handling(self): pv_paths = [] for pp in self.objs[PV_INT]: pv_paths.append(pp.object_path) vg_name = rs(8, '_vg') # Test getting a job right away vg_path, vg_job = self.objs[MANAGER_INT][0].Manager.VgCreate( vg_name, pv_paths, 0, {}) self.assertTrue(vg_path == '/') self.assertTrue(vg_job and len(vg_job) > 0) self._wait_for_job(vg_job) def _test_expired_timer(self, num_lvs): rc = False pv_paths = [] for pp in self.objs[PV_INT]: pv_paths.append(pp.object_path) # In small configurations lvm is pretty snappy, so lets create a VG # add a number of LVs and then remove the VG and all the contained # LVs which appears to consistently run a little slow. vg_proxy = self._vg_create(pv_paths) for i in range(0, num_lvs): vg_proxy.update() if vg_proxy.Vg.FreeCount > 0: obj_path, job = vg_proxy.Vg.LvCreateLinear( rs(8, "_lv"), mib(4), False, -1, {}) self.assertTrue(job == '/') else: # We ran out of space, test will probably fail break # Make sure that we are honoring the timeout start = time.time() remove_job = vg_proxy.Vg.Remove(1, {}) end = time.time() tt_remove = float(end) - float(start) self.assertTrue(tt_remove < 2.0, "remove time %s" % (str(tt_remove))) # Depending on how long it took we could finish either way if remove_job != '/': # We got a job result = self._wait_for_job(remove_job) self.assertTrue(result == '/') rc = True else: # It completed before timer popped pass return rc def test_job_handling_timer(self): yes = False # This may not pass for i in [48, 64, 128]: yes = self._test_expired_timer(i) if yes: break print('Attempt (%d) failed, trying again...' % (i)) self.assertTrue(yes) def test_pv_tags(self): pvs = [] pv_paths = [] for pp in self.objs[PV_INT]: pv_paths.append(pp.object_path) vg = self._vg_create(pv_paths).Vg # Get the PVs for p in vg.Pvs: pvs.append(ClientProxy(self.bus, p).Pv) for tags_value in [['hello'], ['foo', 'bar']]: rc = vg.PvTagsAdd(vg.Pvs, tags_value, -1, {}) self.assertTrue(rc == '/') for p in pvs: p.update() self.assertTrue(sorted(tags_value) == p.Tags) vg.PvTagsDel(vg.Pvs, tags_value, -1, {}) for p in pvs: p.update() self.assertTrue([] == p.Tags) def test_vg_tags(self): vg = self._vg_create().Vg t = ['Testing', 'tags'] vg.TagsAdd(t, -1, {}) vg.update() self.assertTrue(t == vg.Tags) vg.TagsDel(t, -1, {}) vg.update() self.assertTrue([] == vg.Tags) def test_lv_tags(self): vg = self._vg_create().Vg lv = self._test_lv_create( vg.LvCreateLinear, (rs(8, '_lv'), mib(4), False, -1, {}), vg) t = ['Testing', 'tags'] lv.Lv.TagsAdd(t, -1, {}) lv.update() self.assertTrue(t == lv.LvCommon.Tags) lv.Lv.TagsDel(t, -1, {}) lv.update() self.assertTrue([] == lv.LvCommon.Tags) def test_vg_allocation_policy_set(self): vg = self._vg_create().Vg for p in ['anywhere', 'contiguous', 'cling', 'normal']: rc = vg.AllocationPolicySet(p, -1, {}) self.assertEqual(rc, '/') vg.update() prop = getattr(vg, 'Alloc' + p.title()) self.assertTrue(prop) def test_vg_max_pv(self): vg = self._vg_create().Vg # BZ: https://bugzilla.redhat.com/show_bug.cgi?id=1280496 # TODO: Add a test back for larger values here when bug is resolved for p in [0, 1, 10, 100, 100, 1024, 2 ** 32 - 1]: rc = vg.MaxPvSet(p, -1, {}) self.assertEqual(rc, '/') vg.update() self.assertTrue(vg.MaxPv == p, "Expected %s != Actual %s" % (str(p), str(vg.MaxPv))) def test_vg_max_lv(self): vg = self._vg_create().Vg # BZ: https://bugzilla.redhat.com/show_bug.cgi?id=1280496 # TODO: Add a test back for larger values here when bug is resolved for p in [0, 1, 10, 100, 100, 1024, 2 ** 32 - 1]: rc = vg.MaxLvSet(p, -1, {}) self.assertEqual(rc, '/') vg.update() self.assertTrue(vg.MaxLv == p, "Expected %s != Actual %s" % (str(p), str(vg.MaxLv))) def test_vg_uuid_gen(self): # TODO renable test case when # https://bugzilla.redhat.com/show_bug.cgi?id=1264169 gets fixed # This was tested with lvmetad disabled and we passed print("\nSkipping Vg.UuidGenerate until BZ: 1264169 resolved\n") if False: vg = self._vg_create().Vg prev_uuid = vg.Uuid rc = vg.UuidGenerate(-1, {}) self.assertEqual(rc, '/') vg.update() self.assertTrue(vg.Uuid != prev_uuid, "Expected %s != Actual %s" % (vg.Uuid, prev_uuid)) def test_vg_activate_deactivate(self): vg = self._vg_create().Vg self._test_lv_create( vg.LvCreateLinear, (rs(8, '_lv'), mib(4), False, -1, {}), vg) vg.update() vg.Deactivate(0, -1, {}) self.assertEqual(self._refresh(), 0) vg.Activate(0, -1, {}) self.assertEqual(self._refresh(), 0) # Try control flags for i in range(0, 5): vg.Activate(1 << i, -1, {}) def test_pv_resize(self): self.assertTrue(len(self.objs[PV_INT]) > 0) if len(self.objs[PV_INT]) > 0: pv = ClientProxy(self.bus, self.objs[PV_INT][0].object_path).Pv original_size = pv.SizeBytes new_size = original_size / 2 pv.ReSize(new_size, -1, {}) self.assertEqual(self._refresh(), 0) pv.update() self.assertTrue(pv.SizeBytes != original_size) pv.ReSize(0, -1, {}) self.assertEqual(self._refresh(), 0) pv.update() self.assertTrue(pv.SizeBytes == original_size) def test_pv_allocation(self): pv_paths = [] for pp in self.objs[PV_INT]: pv_paths.append(pp.object_path) vg = self._vg_create(pv_paths).Vg pv = ClientProxy(self.bus, vg.Pvs[0]).Pv pv.AllocationEnabled(False, -1, {}) pv.update() self.assertFalse(pv.Allocatable) pv.AllocationEnabled(True, -1, {}) pv.update() self.assertTrue(pv.Allocatable) self.assertEqual(self._refresh(), 0) def _get_devices(self): context = pyudev.Context() return context.list_devices(subsystem='block', MAJOR='8') def test_pv_scan(self): devices = self._get_devices() mgr = self._manager().Manager self.assertEqual(mgr.PvScan(False, True, dbus.Array([], 's'), dbus.Array([], '(ii)'), -1, {}), '/') self.assertEqual(self._refresh(), 0) self.assertEqual(mgr.PvScan(False, False, dbus.Array([], 's'), dbus.Array([], '(ii)'), -1, {}), '/') self.assertEqual(self._refresh(), 0) block_path = [] for d in devices: block_path.append(d['DEVNAME']) self.assertEqual(mgr.PvScan(False, True, block_path, dbus.Array([], '(ii)'), -1, {}), '/') self.assertEqual(self._refresh(), 0) mm = [] for d in devices: mm.append((int(d['MAJOR']), int(d['MINOR']))) self.assertEqual(mgr.PvScan(False, True, block_path, mm, -1, {}), '/') self.assertEqual(self._refresh(), 0) self.assertEqual(mgr.PvScan(False, True, dbus.Array([], 's'), mm, -1, {}), '/') self.assertEqual(self._refresh(), 0) @staticmethod def _write_some_data(device_path, size): blocks = int(size / 512) block = bytearray(512) for i in range(0, 512): block[i] = i % 255 with open(device_path, mode='wb') as lv: for i in range(0, blocks): lv.write(block) def test_snapshot_merge(self): # Create a non-thin LV and merge it ss_size = mib(8) lv_p = self._create_lv(size=mib(16)) ss_name = lv_p.LvCommon.Name + '_snap' snapshot_path = lv_p.Lv.Snapshot(ss_name, ss_size, -1, {})[0] ss = ClientProxy(self.bus, snapshot_path) # Write some data to snapshot so merge takes some time TestDbusService._write_some_data(ss.LvCommon.Path, ss_size / 2) job_path = ss.Snapshot.Merge(0, {}) self.assertTrue(job_path != '/') self._wait_for_job(job_path) def test_snapshot_merge_thin(self): # Create a thin LV, snapshot it and merge it tp = self._create_lv(True) thin_path = tp.ThinPool.LvCreate( rs(10, '_thin_lv'), mib(10), -1, {})[0] lv_p = ClientProxy(self.bus, thin_path) ss_name = lv_p.LvCommon.Name + '_snap' snapshot_path = lv_p.Lv.Snapshot(ss_name, 0, -1, {})[0] ss = ClientProxy(self.bus, snapshot_path) job_path = ss.Snapshot.Merge(0, {}) self.assertTrue(job_path != '/') self._wait_for_job(job_path) def _create_cache_pool(self): vg = self._vg_create().Vg md = self._create_lv(size=(mib(8)), vg=vg) data = self._create_lv(size=(mib(8)), vg=vg) cache_pool_path = vg.CreateCachePool( md.object_path, data.object_path, -1, {})[0] cp = ClientProxy(self.bus, cache_pool_path) return (vg, cp) def test_cache_pool_create(self): vg, cache_pool = self._create_cache_pool() self.assertTrue('/com/redhat/lvmdbus1/CachePool' in cache_pool.object_path) def test_cache_lv_create(self): for destroy_cache in [True, False]: vg, cache_pool = self._create_cache_pool() lv_to_cache = self._create_lv(size=mib(8), vg=vg) c_lv_path = cache_pool.CachePool.CacheLv( lv_to_cache.object_path, -1, {})[0] cached_lv = ClientProxy(self.bus, c_lv_path) uncached_lv_path = \ cached_lv.CachedLv.DetachCachePool(destroy_cache, -1, {})[0] self.assertTrue('/com/redhat/lvmdbus1/Lv' in uncached_lv_path) vg.Remove(-1, {}) def test_vg_change(self): vg_proxy = self._vg_create() result = vg_proxy.Vg.Change(-1, {'-a': 'ay'}) self.assertTrue(result == '/') result = vg_proxy.Vg.Change(-1, {'-a': 'n'}) self.assertTrue(result == '/') def _invalid_vg_lv_name_characters(self): bad_vg_lv_set = set(string.printable) - \ set(string.ascii_letters + string.digits + '.-_+') return ''.join(bad_vg_lv_set) def test_invalid_names(self): mgr = self.objs[MANAGER_INT][0].Manager # Pv device path with self.assertRaises(dbus.exceptions.DBusException): mgr.PvCreate("/dev/space in name", -1, {}) # VG Name testing... # Go through all bad characters pv_paths = [self.objs[PV_INT][0].object_path] bad_chars = self._invalid_vg_lv_name_characters() for c in bad_chars: with self.assertRaises(dbus.exceptions.DBusException): mgr.VgCreate("name%s" % (c), pv_paths, -1, {}) # Bad names for bad in [".", ".."]: with self.assertRaises(dbus.exceptions.DBusException): mgr.VgCreate(bad, pv_paths, -1, {}) # Exceed name length for i in [128, 1024, 4096]: with self.assertRaises(dbus.exceptions.DBusException): mgr.VgCreate('T' * i, pv_paths, -1, {}) # Create a VG and try to create LVs with different bad names vg_path = mgr.VgCreate("test", pv_paths, -1, {})[0] vg_proxy = ClientProxy(self.bus, vg_path) for c in bad_chars: with self.assertRaises(dbus.exceptions.DBusException): vg_proxy.Vg.LvCreateLinear(rs(8, '_lv') + c, mib(4), False, -1, {}) for r in ("_cdata", "_cmeta", "_corig", "_mimage", "_mlog", "_pmspare", "_rimage", "_rmeta", "_tdata", "_tmeta", "_vorigin"): with self.assertRaises(dbus.exceptions.DBusException): vg_proxy.Vg.LvCreateLinear(rs(8, '_lv') + r, mib(4), False, -1, {}) for r in ("snapshot", "pvmove"): with self.assertRaises(dbus.exceptions.DBusException): vg_proxy.Vg.LvCreateLinear(r + rs(8, '_lv'), mib(4), False, -1, {}) _ALLOWABLE_TAG_CH = string.ascii_letters + string.digits + "._-+/=!:&#" def _invalid_tag_characters(self): bad_tag_ch_set = set(string.printable) - set(self._ALLOWABLE_TAG_CH) return ''.join(bad_tag_ch_set) def test_invalid_tags(self): mgr = self.objs[MANAGER_INT][0].Manager pv_paths = [self.objs[PV_INT][0].object_path] vg_path = mgr.VgCreate("test", pv_paths, -1, {})[0] vg_proxy = ClientProxy(self.bus, vg_path) for c in self._invalid_tag_characters(): with self.assertRaises(dbus.exceptions.DBusException): vg_proxy.Vg.TagsAdd([c], -1, {}) for c in self._invalid_tag_characters(): with self.assertRaises(dbus.exceptions.DBusException): vg_proxy.Vg.TagsAdd(["a%sb" % (c)], -1, {}) def test_tag_names(self): mgr = self.objs[MANAGER_INT][0].Manager pv_paths = [self.objs[PV_INT][0].object_path] vg_path = mgr.VgCreate("test", pv_paths, -1, {})[0] vg_proxy = ClientProxy(self.bus, vg_path) for i in range(1, 64): tag = rs(i, "", self._ALLOWABLE_TAG_CH) r = vg_proxy.Vg.TagsAdd([tag], -1, {}) self.assertTrue(r == '/') vg_proxy.update() self.assertTrue(tag in vg_proxy.Vg.Tags, "%s not in %s" % (tag, str(vg_proxy.Vg.Tags))) self.assertEqual(i, len(vg_proxy.Vg.Tags), "%d != %d" % (i, len(vg_proxy.Vg.Tags))) if __name__ == '__main__': # Test forking & exec new each time test_shell = os.getenv('LVM_DBUS_TEST_SHELL', 0) set_execution(False) if test_shell == 0: unittest.main(exit=True) else: unittest.main(exit=False) # Test lvm shell print('\n *** Testing lvm shell *** \n') set_execution(True) unittest.main()