1
0
mirror of git://sourceware.org/git/lvm2.git synced 2024-12-30 17:18:21 +03:00
lvm2/test/dbus/lvmdbustest.py
Tony Asleson 73121e3f07 lvmdbustest: Increase number of LVs
As storage is getting faster, we need to create more LVs to pass this test.
2022-06-30 10:55:16 -05:00

2122 lines
56 KiB
Python
Executable File

#!/usr/bin/python3
# Copyright (C) 2015-2016 Red Hat, Inc. All rights reserved.
#
# This copyrighted material is made available to anyone wishing to use,
# modify, copy, or redistribute it subject to the terms and conditions
# of the GNU General Public License v.2.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# noinspection PyUnresolvedReferences
import dbus
# noinspection PyUnresolvedReferences
from dbus.mainloop.glib import DBusGMainLoop
import unittest
import pyudev
from testlib import *
import testlib
from subprocess import Popen, PIPE
from glob import glob
import os
g_tmo = 0
# Approx. min size
VDO_MIN_SIZE = mib(8192)
# Prefix on created objects to enable easier clean-up
g_prefix = os.getenv('PREFIX', '')
# Check dev dir prefix for test suite (LVM_TEST_DEVDIR
dm_dev_dir = os.getenv('DM_DEV_DIR', '/dev')
# Use the session bus instead of the system bus
use_session = os.getenv('LVM_DBUSD_USE_SESSION', False)
# Only use the devices listed in the ENV variable
pv_device_list = os.getenv('LVM_DBUSD_PV_DEVICE_LIST', None)
# Default is to test all modes
# 0 == Only test fork & exec mode
# 1 == Only test lvm shell mode
# 2 == Test both fork & exec & lvm shell mode (default)
# Other == Test just lvm shell mode
test_shell = os.getenv('LVM_DBUSD_TEST_MODE', 2)
# LVM binary to use
LVM_EXECUTABLE = os.getenv('LVM_BINARY', '/usr/sbin/lvm')
# Empty options dictionary (EOD)
EOD = dbus.Dictionary({}, signature=dbus.Signature('sv'))
# Base interfaces on LV objects
LV_BASE_INT = (LV_COMMON_INT, LV_INT)
if use_session:
bus = dbus.SessionBus(mainloop=DBusGMainLoop())
else:
bus = dbus.SystemBus(mainloop=DBusGMainLoop())
# If we have multiple clients we will globally disable introspection
# validation to limit the massive amount of introspection calls we make as
# that method prevents things from executing concurrently
if pv_device_list:
testlib.validate_introspection = False
def vg_n(prefix=None):
name = rs(8, '_vg')
if prefix:
name = prefix + name
return g_prefix + name
def lv_n(suffix=None):
if not suffix:
s = '_lv'
else:
s = suffix
return rs(8, s)
def _is_testsuite_pv(pv_name):
return g_prefix != "" and pv_name[-1].isdigit() and \
pv_name[:-1].endswith(g_prefix + "pv")
def is_nested_pv(pv_name):
return pv_name.count('/') == 3 and not _is_testsuite_pv(pv_name)
def _root_pv_name(res, pv_name):
if not is_nested_pv(pv_name):
return pv_name
vg_name = pv_name.split('/')[2]
for v in res[VG_INT]:
if v.Vg.Name == vg_name:
for pv in res[PV_INT]:
if pv.object_path in v.Vg.Pvs:
return _root_pv_name(res, pv.Pv.Name)
return None
def _prune_lvs(res, interface, vg_object_path):
lvs = [lv for lv in res[interface] if lv.LvCommon.Vg == vg_object_path]
res[interface] = lvs
def _prune(res, pv_filter):
if pv_filter:
pv_lookup = {}
pv_list = []
for p in res[PV_INT]:
if _root_pv_name(res, p.Pv.Name) in pv_filter:
pv_list.append(p)
pv_lookup[p.object_path] = p
res[PV_INT] = pv_list
vg_list = []
for v in res[VG_INT]:
if v.Vg.Pvs[0] in pv_lookup:
vg_list.append(v)
for interface in \
[LV_INT, THINPOOL_INT, LV_COMMON_INT,
CACHE_POOL_INT, CACHE_LV_INT, VDOPOOL_INT]:
_prune_lvs(res, interface, v.object_path)
res[VG_INT] = vg_list
return res
def get_objects():
rc = {
MANAGER_INT: [], PV_INT: [], VG_INT: [], LV_INT: [],
THINPOOL_INT: [], JOB_INT: [], SNAPSHOT_INT: [], LV_COMMON_INT: [],
CACHE_POOL_INT: [], CACHE_LV_INT: [], VG_VDO_INT: [], VDOPOOL_INT: []}
object_manager_object = bus.get_object(
BUS_NAME, "/com/redhat/lvmdbus1", introspect=False)
manager_interface = dbus.Interface(
object_manager_object, "org.freedesktop.DBus.ObjectManager")
objects = manager_interface.GetManagedObjects()
for object_path, v in objects.items():
proxy = ClientProxy(bus, object_path, v)
for interface in v.keys():
rc[interface].append(proxy)
# At this point we have a full population of everything, we now need to
# prune the objects if we are filtering PVs with a sub selection.
return _prune(rc, pv_device_list), bus
def set_execution(lvmshell, test_result):
if lvmshell:
m = 'lvm shell (non-fork)'
else:
m = "forking & exec'ing"
lvm_manager = dbus.Interface(bus.get_object(
BUS_NAME, "/com/redhat/lvmdbus1/Manager", introspect=False),
"com.redhat.lvmdbus1.Manager")
rc = lvm_manager.UseLvmShell(lvmshell)
if rc:
std_err_print('Successfully changed execution mode to "%s"' % m)
else:
std_err_print('ERROR: Failed to change execution mode to "%s"' % m)
test_result.register_fail()
return rc
def call_lvm(command):
"""
Call lvm executable and return a tuple of exitcode, stdout, stderr
:param command: Command to execute
:type command: list
:returns (exitcode, stdout, stderr)
:rtype (int, str, str)
"""
# Prepend the full lvm executable so that we can run different versions
# in different locations on the same box
command.insert(0, LVM_EXECUTABLE)
process = Popen(
command, stdout=PIPE, stderr=PIPE, close_fds=True, env=os.environ)
out = process.communicate()
stdout_text = bytes(out[0]).decode("utf-8")
stderr_text = bytes(out[1]).decode("utf-8")
return process.returncode, stdout_text, stderr_text
def supports_vdo():
cmd = ['segtypes']
modprobe = Popen(["modprobe", "kvdo"], stdout=PIPE, stderr=PIPE, close_fds=True, env=os.environ)
modprobe.communicate()
if modprobe.returncode != 0:
return False
rc, out, err = call_lvm(cmd)
if rc != 0 or "vdo" not in out:
return False
return True
# noinspection PyUnresolvedReferences
class TestDbusService(unittest.TestCase):
def setUp(self):
# Because of the sensitive nature of running LVM tests we will only
# run if we have PVs and nothing else, so that we can be confident that
# we are not mucking with someones data on their system
self.objs, self.bus = get_objects()
if len(self.objs[PV_INT]) == 0:
std_err_print('No PVs present exiting!')
sys.exit(1)
if len(self.objs[MANAGER_INT]) != 1:
std_err_print('Expecting a manager object!')
sys.exit(1)
if len(self.objs[VG_INT]) != 0:
std_err_print('Expecting no VGs to exist!')
sys.exit(1)
self.pvs = []
for p in self.objs[PV_INT]:
self.pvs.append(p.Pv.Name)
self.vdo = supports_vdo()
def _recurse_vg_delete(self, vg_proxy, pv_proxy, nested_pv_hash):
for pv_device_name, t in nested_pv_hash.items():
vg_name = str(vg_proxy.Vg.Name)
if vg_name in pv_device_name:
self._recurse_vg_delete(t[0], t[1], nested_pv_hash)
break
vg_proxy.update()
self.handle_return(vg_proxy.Vg.Remove(dbus.Int32(g_tmo), EOD))
if is_nested_pv(pv_proxy.Pv.Name):
rc = self._pv_remove(pv_proxy)
self.assertTrue(rc == '/')
def tearDown(self):
# If we get here it means we passed setUp, so lets remove anything
# and everything that remains, besides the PVs themselves
self.objs, self.bus = get_objects()
# The self.objs[PV_INT] list only contains those which we should be
# mucking with, lets remove any embedded/nested PVs first, then proceed
# to walk the base PVs and remove the VGs
nested_pvs = {}
non_nested = []
for p in self.objs[PV_INT]:
if is_nested_pv(p.Pv.Name):
if p.Pv.Vg != '/':
v = ClientProxy(self.bus, p.Pv.Vg, interfaces=(VG_INT,))
nested_pvs[p.Pv.Name] = (v, p)
else:
# Nested PV with no VG, so just simply remove it!
self._pv_remove(p)
else:
non_nested.append(p)
for p in non_nested:
# When we remove a VG for a PV it could ripple across multiple
# PVs, so update each PV while removing each VG, to ensure
# the properties are current and correct.
p.update()
if p.Pv.Vg != '/':
v = ClientProxy(self.bus, p.Pv.Vg, interfaces=(VG_INT, ))
self._recurse_vg_delete(v, p, nested_pvs)
# Check to make sure the PVs we had to start exist, else re-create
# them
self.objs, self.bus = get_objects()
if len(self.pvs) != len(self.objs[PV_INT]):
for p in self.pvs:
found = False
for pc in self.objs[PV_INT]:
if pc.Pv.Name == p:
found = True
break
if not found:
# print('Re-creating PV=', p)
self._pv_create(p)
def _check_consistency(self):
# Only do consistency checks if we aren't running the unit tests
# concurrently
if pv_device_list is None:
self.assertEqual(self._refresh(), 0)
def handle_return(self, rc):
if isinstance(rc, (tuple, list)):
# We have a tuple returned
if rc[0] != '/':
return rc[0]
else:
return self._wait_for_job(rc[1])
else:
if rc == '/':
return rc
else:
return self._wait_for_job(rc)
def _pv_create(self, device):
pv_path = self.handle_return(
self.objs[MANAGER_INT][0].Manager.PvCreate(
dbus.String(device), dbus.Int32(g_tmo), EOD)
)
self._validate_lookup(device, pv_path)
self.assertTrue(pv_path is not None and len(pv_path) > 0)
return pv_path
def _manager(self):
return self.objs[MANAGER_INT][0]
def _refresh(self):
return self._manager().Manager.Refresh()
def test_refresh(self):
self._check_consistency()
def test_version(self):
rc = self.objs[MANAGER_INT][0].Manager.Version
self.assertTrue(rc is not None and len(rc) > 0)
self._check_consistency()
def _vg_create(self, pv_paths=None, vg_prefix=None):
if not pv_paths:
pv_paths = self._all_pv_object_paths()
vg_name = vg_n(prefix=vg_prefix)
vg_path = self.handle_return(
self.objs[MANAGER_INT][0].Manager.VgCreate(
dbus.String(vg_name),
dbus.Array(pv_paths, signature=dbus.Signature('o')),
dbus.Int32(g_tmo),
EOD))
self._validate_lookup(vg_name, vg_path)
self.assertTrue(vg_path is not None and len(vg_path) > 0)
intf = [VG_INT, ]
if self.vdo:
intf.append(VG_VDO_INT)
return ClientProxy(self.bus, vg_path, interfaces=intf)
def test_vg_create(self):
self._vg_create()
self._check_consistency()
def test_vg_delete(self):
vg = self._vg_create().Vg
self.handle_return(
vg.Remove(dbus.Int32(g_tmo), EOD))
self._check_consistency()
def _pv_remove(self, pv):
rc = self.handle_return(
pv.Pv.Remove(dbus.Int32(g_tmo), EOD))
return rc
def test_pv_remove_add(self):
target = self.objs[PV_INT][0]
# Remove the PV
rc = self._pv_remove(target)
self.assertTrue(rc == '/')
self._check_consistency()
# Add it back
rc = self._pv_create(target.Pv.Name)[0]
self.assertTrue(rc == '/')
self._check_consistency()
def _create_raid5_thin_pool(self, vg=None):
meta_name = "meta_r5"
data_name = "data_r5"
if not vg:
vg = self._vg_create(self._all_pv_object_paths()).Vg
lv_meta_path = self.handle_return(
vg.LvCreateRaid(
dbus.String(meta_name),
dbus.String("raid5"),
dbus.UInt64(mib(4)),
dbus.UInt32(0),
dbus.UInt32(0),
dbus.Int32(g_tmo),
EOD)
)
self._validate_lookup("%s/%s" % (vg.Name, meta_name), lv_meta_path)
lv_data_path = self.handle_return(
vg.LvCreateRaid(
dbus.String(data_name),
dbus.String("raid5"),
dbus.UInt64(mib(16)),
dbus.UInt32(0),
dbus.UInt32(0),
dbus.Int32(g_tmo),
EOD)
)
self._validate_lookup("%s/%s" % (vg.Name, data_name), lv_data_path)
thin_pool_path = self.handle_return(
vg.CreateThinPool(
dbus.ObjectPath(lv_meta_path),
dbus.ObjectPath(lv_data_path),
dbus.Int32(g_tmo), EOD)
)
# Get thin pool client proxy
intf = (LV_COMMON_INT, LV_INT, THINPOOL_INT)
thin_pool = ClientProxy(self.bus, thin_pool_path, interfaces=intf)
return vg, thin_pool
def test_meta_lv_data_lv_props(self):
# Ensure that metadata lv and data lv for thin pools and cache pools
# point to a valid LV
(vg, thin_pool) = self._create_raid5_thin_pool()
# Check properties on thin pool
self.assertTrue(thin_pool.ThinPool.DataLv != '/')
self.assertTrue(thin_pool.ThinPool.MetaDataLv != '/')
(vg, cache_pool) = self._create_cache_pool(vg)
self.assertTrue(cache_pool.CachePool.DataLv != '/')
self.assertTrue(cache_pool.CachePool.MetaDataLv != '/')
# Cache the thin pool
cached_thin_pool_path = self.handle_return(
cache_pool.CachePool.CacheLv(
dbus.ObjectPath(thin_pool.object_path),
dbus.Int32(g_tmo), EOD)
)
# Get object proxy for cached thin pool
intf = (LV_COMMON_INT, LV_INT, THINPOOL_INT)
cached_thin_pool_object = ClientProxy(
self.bus, cached_thin_pool_path, interfaces=intf)
# Check properties on cache pool
self.assertTrue(cached_thin_pool_object.ThinPool.DataLv != '/')
self.assertTrue(cached_thin_pool_object.ThinPool.MetaDataLv != '/')
def _lookup(self, lvm_id):
return self.objs[MANAGER_INT][0].\
Manager.LookUpByLvmId(dbus.String(lvm_id))
def _validate_lookup(self, lvm_name, object_path):
t = self._lookup(lvm_name)
self.assertTrue(
object_path == t, "%s != %s for %s" % (object_path, t, lvm_name))
def test_lookup_by_lvm_id(self):
# For the moment lets just lookup what we know about which is PVs
# When we start testing VGs and LVs we will test lookups for those
# during those unit tests
for p in self.objs[PV_INT]:
rc = self._lookup(p.Pv.Name)
self.assertTrue(rc is not None and rc != '/')
# Search for something which doesn't exist
rc = self._lookup('/dev/null')
self.assertTrue(rc == '/')
def test_vg_extend(self):
# Create a VG
self.assertTrue(len(self.objs[PV_INT]) >= 2)
if len(self.objs[PV_INT]) >= 2:
pv_initial = self.objs[PV_INT][0]
pv_next = self.objs[PV_INT][1]
vg = self._vg_create([pv_initial.object_path]).Vg
path = self.handle_return(
vg.Extend(
dbus.Array([pv_next.object_path], signature="o"),
dbus.Int32(g_tmo), EOD)
)
self.assertTrue(path == '/')
self._check_consistency()
# noinspection PyUnresolvedReferences
def test_vg_reduce(self):
self.assertTrue(len(self.objs[PV_INT]) >= 2)
if len(self.objs[PV_INT]) >= 2:
vg = self._vg_create(
[self.objs[PV_INT][0].object_path,
self.objs[PV_INT][1].object_path]).Vg
path = self.handle_return(
vg.Reduce(
dbus.Boolean(False), dbus.Array([vg.Pvs[0]], signature='o'),
dbus.Int32(g_tmo), EOD)
)
self.assertTrue(path == '/')
self._check_consistency()
def _verify_lv_paths(self, vg, new_name):
"""
# Go through each LV and make sure it has the correct path back to the
# VG
:return:
"""
lv_paths = vg.Lvs
for l in lv_paths:
lv_proxy = ClientProxy(
self.bus, l, interfaces=(LV_COMMON_INT,)).LvCommon
self.assertTrue(
lv_proxy.Vg == vg.object_path, "%s != %s" %
(lv_proxy.Vg, vg.object_path))
full_name = "%s/%s" % (new_name, lv_proxy.Name)
lv_path = self._lookup(full_name)
self.assertTrue(
lv_path == lv_proxy.object_path, "%s != %s" %
(lv_path, lv_proxy.object_path))
# noinspection PyUnresolvedReferences
def test_vg_rename(self):
vg = self._vg_create().Vg
# Do a vg lookup
path = self._lookup(vg.Name)
vg_name_start = vg.Name
prev_path = path
self.assertTrue(path != '/', "%s" % (path))
# Create some LVs in the VG
for i in range(0, 5):
lv_t = self._create_lv(size=mib(4), vg=vg)
full_name = "%s/%s" % (vg_name_start, lv_t.LvCommon.Name)
lv_path = self._lookup(full_name)
self.assertTrue(lv_path == lv_t.object_path)
new_name = 'renamed_' + vg.Name
path = self.handle_return(
vg.Rename(dbus.String(new_name), dbus.Int32(g_tmo), EOD))
self.assertTrue(path == '/')
self._check_consistency()
# Do a vg lookup
path = self._lookup(new_name)
self.assertTrue(path != '/', "%s" % (path))
self.assertTrue(prev_path == path, "%s != %s" % (prev_path, path))
# Go through each LV and make sure it has the correct path back to the
# VG
vg.update()
self.assertTrue(len(vg.Lvs) == 5)
self._verify_lv_paths(vg, new_name)
def _verify_hidden_lookups(self, lv_common_object, vgname):
hidden_lv_paths = lv_common_object.HiddenLvs
for h in hidden_lv_paths:
h_lv = ClientProxy(
self.bus, h, interfaces=(LV_COMMON_INT,)).LvCommon
if len(h_lv.HiddenLvs) > 0:
self._verify_hidden_lookups(h_lv, vgname)
full_name = "%s/%s" % (vgname, h_lv.Name)
# print("Hidden check %s" % (full_name))
lookup_path = self._lookup(full_name)
self.assertTrue(lookup_path != '/')
self.assertTrue(lookup_path == h_lv.object_path)
# Lets's strip off the '[ ]' and make sure we can find
full_name = "%s/%s" % (vgname, h_lv.Name[1:-1])
# print("Hidden check %s" % (full_name))
lookup_path = self._lookup(full_name)
self.assertTrue(lookup_path != '/')
self.assertTrue(lookup_path == h_lv.object_path)
def test_vg_rename_with_thin_pool(self):
(vg, thin_pool) = self._create_raid5_thin_pool()
vg_name_start = vg.Name
# noinspection PyTypeChecker
self._verify_hidden_lookups(thin_pool.LvCommon, vg_name_start)
for i in range(0, 5):
lv_name = lv_n()
thin_lv_path = self.handle_return(
thin_pool.ThinPool.LvCreate(
dbus.String(lv_name),
dbus.UInt64(mib(16)),
dbus.Int32(g_tmo),
EOD))
self._validate_lookup(
"%s/%s" % (vg_name_start, lv_name), thin_lv_path)
self.assertTrue(thin_lv_path != '/')
full_name = "%s/%s" % (vg_name_start, lv_name)
lookup_lv_path = self._lookup(full_name)
self.assertTrue(
thin_lv_path == lookup_lv_path,
"%s != %s" % (thin_lv_path, lookup_lv_path))
# Rename the VG
new_name = 'renamed_' + vg.Name
path = self.handle_return(
vg.Rename(dbus.String(new_name), dbus.Int32(g_tmo), EOD))
self.assertTrue(path == '/')
self._check_consistency()
vg.update()
thin_pool.update()
self._verify_lv_paths(vg, new_name)
# noinspection PyTypeChecker
self._verify_hidden_lookups(thin_pool.LvCommon, new_name)
def _test_lv_create(self, method, params, vg, proxy_interfaces=None):
lv = None
path = self.handle_return(method(*params))
self.assertTrue(vg)
if path:
lv = ClientProxy(self.bus, path, interfaces=proxy_interfaces)
# We are quick enough now that we can get VolumeType changes from
# 'I' to 'i' between the time it takes to create a RAID and it returns
# and when we refresh state here. Not sure how we can handle this as
# we cannot just sit and poll all the time for changes...
# self._check_consistency()
return lv
def test_lv_create(self):
lv_name = lv_n()
vg = self._vg_create().Vg
lv = self._test_lv_create(
vg.LvCreate,
(dbus.String(lv_name), dbus.UInt64(mib(4)),
dbus.Array([], signature='(ott)'), dbus.Int32(g_tmo),
EOD), vg, LV_BASE_INT)
self._validate_lookup("%s/%s" % (vg.Name, lv_name), lv.object_path)
def test_lv_create_job(self):
lv_name = lv_n()
vg = self._vg_create().Vg
(object_path, job_path) = vg.LvCreate(
dbus.String(lv_name), dbus.UInt64(mib(4)),
dbus.Array([], signature='(ott)'), dbus.Int32(0),
EOD)
self.assertTrue(object_path == '/')
self.assertTrue(job_path != '/')
object_path = self._wait_for_job(job_path)
self._validate_lookup("%s/%s" % (vg.Name, lv_name), object_path)
self.assertTrue(object_path != '/')
def test_lv_create_linear(self):
lv_name = lv_n()
vg = self._vg_create().Vg
lv = self._test_lv_create(
vg.LvCreateLinear,
(dbus.String(lv_name), dbus.UInt64(mib(4)), dbus.Boolean(False),
dbus.Int32(g_tmo), EOD), vg, LV_BASE_INT)
self._validate_lookup("%s/%s" % (vg.Name, lv_name), lv.object_path)
def _all_pv_object_paths(self):
return [pp.object_path for pp in self.objs[PV_INT]]
def test_lv_create_striped(self):
lv_name = lv_n()
vg = self._vg_create(self._all_pv_object_paths()).Vg
lv = self._test_lv_create(
vg.LvCreateStriped,
(dbus.String(lv_name), dbus.UInt64(mib(4)),
dbus.UInt32(2), dbus.UInt32(8), dbus.Boolean(False),
dbus.Int32(g_tmo), EOD), vg, LV_BASE_INT)
self._validate_lookup("%s/%s" % (vg.Name, lv_name), lv.object_path)
def test_lv_create_mirror(self):
lv_name = lv_n()
vg = self._vg_create(self._all_pv_object_paths()).Vg
lv = self._test_lv_create(
vg.LvCreateMirror,
(dbus.String(lv_name), dbus.UInt64(mib(4)), dbus.UInt32(2),
dbus.Int32(g_tmo), EOD), vg, LV_BASE_INT)
self._validate_lookup("%s/%s" % (vg.Name, lv_name), lv.object_path)
def test_lv_create_raid(self):
lv_name = lv_n()
vg = self._vg_create(self._all_pv_object_paths()).Vg
lv = self._test_lv_create(
vg.LvCreateRaid,
(dbus.String(lv_name), dbus.String('raid5'), dbus.UInt64(mib(16)),
dbus.UInt32(2), dbus.UInt32(8), dbus.Int32(g_tmo), EOD),
vg, LV_BASE_INT)
self._validate_lookup("%s/%s" % (vg.Name, lv_name), lv.object_path)
def _create_lv(self, thinpool=False, size=None, vg=None, suffix=None):
lv_name = lv_n(suffix=suffix)
interfaces = list(LV_BASE_INT)
if thinpool:
interfaces.append(THINPOOL_INT)
if not vg:
vg = self._vg_create(self._all_pv_object_paths()).Vg
if size is None:
size = mib(8)
lv = self._test_lv_create(
vg.LvCreateLinear,
(dbus.String(lv_name), dbus.UInt64(size),
dbus.Boolean(thinpool), dbus.Int32(g_tmo), EOD),
vg, interfaces)
self._validate_lookup("%s/%s" % (vg.Name, lv_name), lv.object_path)
return lv
def _create_thin_pool_lv(self):
return self._create_lv(True)
def test_lv_create_rounding(self):
self._create_lv(size=(mib(2) + 13))
def test_lv_create_thin_pool(self):
self._create_thin_pool_lv()
def _rename_lv_test(self, lv):
path = self._lookup(lv.LvCommon.Name)
prev_path = path
new_name = 'renamed_' + lv.LvCommon.Name
self.handle_return(
lv.Lv.Rename(dbus.String(new_name), dbus.Int32(g_tmo), EOD))
path = self._lookup(new_name)
self._check_consistency()
self.assertTrue(prev_path == path, "%s != %s" % (prev_path, path))
lv.update()
self.assertTrue(
lv.LvCommon.Name == new_name,
"%s != %s" % (lv.LvCommon.Name, new_name))
def test_lv_rename(self):
# Rename a regular LV
lv = self._create_lv()
self._rename_lv_test(lv)
def test_lv_thinpool_rename(self):
# Rename a thin pool
tp = self._create_lv(True)
self.assertTrue(
THINPOOL_LV_PATH in tp.object_path,
"%s" % (tp.object_path))
new_name = 'renamed_' + tp.LvCommon.Name
self.handle_return(tp.Lv.Rename(
dbus.String(new_name), dbus.Int32(g_tmo), EOD))
tp.update()
self._check_consistency()
self.assertEqual(new_name, tp.LvCommon.Name)
def _create_thin_lv(self):
vg = self._vg_create().Vg
tp = self._create_lv(thinpool=True, vg=vg)
lv_name = lv_n('_thin_lv')
thin_path = self.handle_return(
tp.ThinPool.LvCreate(
dbus.String(lv_name),
dbus.UInt64(mib(10)),
dbus.Int32(g_tmo),
EOD)
)
self._validate_lookup("%s/%s" % (vg.Name, lv_name), thin_path)
lv = ClientProxy(
self.bus, thin_path, interfaces=(LV_COMMON_INT, LV_INT))
return vg, thin_path, lv
# noinspection PyUnresolvedReferences
def test_lv_on_thin_pool_rename(self):
# Rename a LV on a thin Pool
vg, thin_path, lv = self._create_thin_lv()
re_named = 'rename_test' + lv.LvCommon.Name
rc = self.handle_return(
lv.Lv.Rename(
dbus.String(re_named),
dbus.Int32(g_tmo),
EOD)
)
self._validate_lookup("%s/%s" % (vg.Name, re_named), thin_path)
self.assertTrue(rc == '/')
self._check_consistency()
def _lv_remove(self, lv):
rc = self.handle_return(
lv.Lv.Remove(
dbus.Int32(g_tmo),
EOD))
self.assertTrue(rc == '/')
self._check_consistency()
def test_lv_remove(self):
lv = self._create_lv()
self._lv_remove(lv)
def _take_lv_snapshot(self, lv_p):
ss_name = 'ss_' + lv_p.LvCommon.Name
ss_obj_path = self.handle_return(lv_p.Lv.Snapshot(
dbus.String(ss_name),
dbus.UInt64(0),
dbus.Int32(g_tmo),
EOD))
self.assertTrue(ss_obj_path != '/')
return ClientProxy(
self.bus, ss_obj_path, interfaces=(LV_COMMON_INT, LV_INT))
def test_lv_snapshot(self):
lv_p = self._create_lv()
self._take_lv_snapshot(lv_p)
# noinspection PyUnresolvedReferences,PyUnusedLocal
def _wait_for_job(self, j_path):
rc = None
j = ClientProxy(self.bus, j_path, interfaces=(JOB_INT, )).Job
while True:
j.update()
if j.Complete:
(ec, error_msg) = j.GetError
self.assertTrue(ec == 0, "%d :%s" % (ec, error_msg))
if ec == 0:
self.assertTrue(j.Percent == 100, "P= %f" % j.Percent)
rc = j.Result
j.Remove()
break
else:
# Most of the time we will get this exception as expected, but there is
# a race condition between checking if it's complete and removing it (we want to try to remove while
# it's not complete to raise the exception)
# Thus, we can't reliably use self.assertRaises.
# We have included it here to test this path in the daemon.
try:
j.Remove()
except dbus.exceptions.DBusException:
pass
if j.Wait(1):
self.assertTrue(j.Wait(0))
j.update()
self.assertTrue(j.Complete)
return rc
def test_lv_create_pv_specific(self):
vg = self._vg_create().Vg
lv_name = lv_n()
pv = vg.Pvs
pvp = ClientProxy(self.bus, pv[0], interfaces=(PV_INT,))
lv = self._test_lv_create(
vg.LvCreate, (
dbus.String(lv_name),
dbus.UInt64(mib(4)),
dbus.Array(
[[pvp.object_path, 0, (pvp.Pv.PeCount - 1)]],
signature='(ott)'),
dbus.Int32(g_tmo), EOD), vg, LV_BASE_INT)
self._validate_lookup("%s/%s" % (vg.Name, lv_name), lv.object_path)
def _test_lv_resize(self, lv):
# Can't resize cache or thin pool volumes or vdo pool lv
if lv.LvCommon.Attr[0] == 'C' or lv.LvCommon.Attr[0] == 't' or \
lv.LvCommon.Attr[0] == 'd':
return
vg = ClientProxy(self.bus, lv.LvCommon.Vg, interfaces=(VG_INT,)).Vg
start_size = lv.LvCommon.SizeBytes
# Vdo are fairly big and need large re-size amounts.
if start_size > mib(4) * 3:
delta = mib(4)
else:
delta = 16384
for size in [start_size + delta, start_size - delta]:
pv_in_use = [i[0] for i in lv.LvCommon.Devices]
# Select a PV in the VG that isn't in use
pv_empty = [p for p in vg.Pvs if p not in pv_in_use]
prev = lv.LvCommon.SizeBytes
if len(pv_empty):
p = ClientProxy(self.bus, pv_empty[0], interfaces=(PV_INT,))
rc = self.handle_return(
lv.Lv.Resize(
dbus.UInt64(size),
dbus.Array(
[[p.object_path, 0, p.Pv.PeCount - 1]], '(oii)'),
dbus.Int32(g_tmo), EOD))
else:
rc = self.handle_return(
lv.Lv.Resize(
dbus.UInt64(size),
dbus.Array([], '(oii)'),
dbus.Int32(g_tmo), EOD))
self.assertEqual(rc, '/')
self._check_consistency()
lv.update()
if prev < size:
self.assertTrue(lv.LvCommon.SizeBytes > prev)
else:
# We are testing re-sizing to same size too...
self.assertTrue(lv.LvCommon.SizeBytes <= prev)
def test_lv_resize(self):
pv_paths = [
self.objs[PV_INT][0].object_path, self.objs[PV_INT][1].object_path]
vg = self._vg_create(pv_paths).Vg
lv = self._create_lv(vg=vg, size=mib(16))
self._test_lv_resize(lv)
def test_lv_resize_same(self):
vg = self._vg_create(self._all_pv_object_paths()).Vg
lv = self._create_lv(vg=vg)
with self.assertRaises(dbus.exceptions.DBusException):
lv.Lv.Resize(
dbus.UInt64(lv.LvCommon.SizeBytes),
dbus.Array([], '(oii)'),
dbus.Int32(-1), EOD)
def test_lv_move(self):
lv = self._create_lv()
pv_path_move = str(lv.LvCommon.Devices[0][0])
# Test moving a specific LV
rc = self.handle_return(
lv.Lv.Move(
dbus.ObjectPath(pv_path_move),
dbus.Struct((0, 0), signature='(tt)'),
dbus.Array([], '(ott)'), dbus.Int32(g_tmo),
EOD))
self.assertTrue(rc == '/')
self._check_consistency()
lv.update()
new_pv = str(lv.LvCommon.Devices[0][0])
self.assertTrue(
pv_path_move != new_pv, "%s == %s" % (pv_path_move, new_pv))
def _test_activate_deactivate(self, lv_p):
self.handle_return(lv_p.Lv.Deactivate(
dbus.UInt64(0), dbus.Int32(g_tmo), EOD))
lv_p.update()
self.assertFalse(lv_p.LvCommon.Active)
self._check_consistency()
self.handle_return(lv_p.Lv.Activate(
dbus.UInt64(0), dbus.Int32(g_tmo), EOD))
lv_p.update()
self.assertTrue(lv_p.LvCommon.Active)
# Vdo property "IndexState" when getting activated goes from
# "opening" -> "online" after we have returned from the activate call
# thus when we try to check the consistency we fail as the property
# is changing on it's own and not because the lvmdbusd failed to
# refresh it's own state. One solution is to not expose IndexState as
# a property.
# TODO Expose method to determine if Lv is partaking in VDO.
vg = ClientProxy(self.bus, lv_p.LvCommon.Vg, interfaces=(VG_INT,))
if "vdo" not in vg.Vg.Name:
self._check_consistency()
# Try control flags
for i in range(0, 5):
self.handle_return(lv_p.Lv.Activate(
dbus.UInt64(1 << i),
dbus.Int32(g_tmo),
EOD))
self.assertTrue(lv_p.LvCommon.Active)
self._check_consistency()
def test_lv_activate_deactivate(self):
lv_p = self._create_lv()
self._test_activate_deactivate(lv_p)
def test_move(self):
lv = self._create_lv()
# Test moving without being LV specific
vg = ClientProxy(self.bus, lv.LvCommon.Vg, interfaces=(VG_INT, )).Vg
pv_to_move = str(lv.LvCommon.Devices[0][0])
rc = self.handle_return(
vg.Move(
dbus.ObjectPath(pv_to_move),
dbus.Struct((0, 0), signature='tt'),
dbus.Array([], '(ott)'),
dbus.Int32(0),
EOD))
self.assertEqual(rc, '/')
self._check_consistency()
vg.update()
lv.update()
location = lv.LvCommon.Devices[0][0]
dst = None
for p in vg.Pvs:
if p != location:
dst = p
# Fetch the destination
pv = ClientProxy(self.bus, dst, interfaces=(PV_INT, )).Pv
# Test range, move it to the middle of the new destination
job = self.handle_return(
vg.Move(
dbus.ObjectPath(location),
dbus.Struct((0, 0), signature='tt'),
dbus.Array([(dst, pv.PeCount // 2, 0), ], '(ott)'),
dbus.Int32(g_tmo),
EOD))
self.assertEqual(job, '/')
self._check_consistency()
def test_job_handling(self):
pv_paths = self._all_pv_object_paths()
vg_name = vg_n()
# Test getting a job right away
vg_path, vg_job = self.objs[MANAGER_INT][0].Manager.VgCreate(
dbus.String(vg_name),
dbus.Array(pv_paths, 'o'),
dbus.Int32(0),
EOD)
self.assertTrue(vg_path == '/')
self.assertTrue(vg_job and len(vg_job) > 0)
vg_path = self._wait_for_job(vg_job)
self._validate_lookup(vg_name, vg_path)
def _test_expired_timer(self, num_lvs):
rc = False
# In small configurations lvm is pretty snappy, so let's create a VG
# add a number of LVs and then remove the VG and all the contained
# LVs which appears to consistently run a little slow.
vg_proxy = self._vg_create(self._all_pv_object_paths())
for i in range(0, num_lvs):
lv_name = lv_n()
vg_proxy.update()
if vg_proxy.Vg.FreeCount > 0:
lv_path = self.handle_return(
vg_proxy.Vg.LvCreateLinear(
dbus.String(lv_name),
dbus.UInt64(mib(4)),
dbus.Boolean(False),
dbus.Int32(g_tmo),
EOD))
self.assertTrue(lv_path != '/')
self._validate_lookup(
"%s/%s" % (vg_proxy.Vg.Name, lv_name), lv_path)
else:
# We ran out of space, test will probably fail
break
# Make sure that we are honoring the timeout
start = time.time()
remove_job = vg_proxy.Vg.Remove(dbus.Int32(1), EOD)
end = time.time()
tt_remove = float(end) - float(start)
self.assertTrue(tt_remove < 2.0, "remove time %s" % (str(tt_remove)))
# Depending on how long it took we could finish either way
if remove_job != '/':
# We got a job
result = self._wait_for_job(remove_job)
self.assertTrue(result == '/')
rc = True
else:
# It completed before timer popped
pass
return rc
# noinspection PyUnusedLocal
def test_job_handling_timer(self):
yes = False
for pp in self.objs[PV_INT]:
if '/dev/sd' not in pp.Pv.Name:
std_err_print("Skipping test_job_handling_timer on loopback")
return
# This may not pass
for i in [128, 256]:
yes = self._test_expired_timer(i)
if yes:
break
std_err_print('Attempt (%d) failed, trying again...' % (i))
self.assertTrue(yes)
def test_pv_tags(self):
pvs = []
vg = self._vg_create(self._all_pv_object_paths()).Vg
# Get the PVs
for p in vg.Pvs:
pvs.append(ClientProxy(self.bus, p, interfaces=(PV_INT, )).Pv)
for tags_value in [['hello'], ['foo', 'bar']]:
rc = self.handle_return(
vg.PvTagsAdd(
dbus.Array(vg.Pvs, 'o'),
dbus.Array(tags_value, 's'),
dbus.Int32(g_tmo),
EOD))
self.assertTrue(rc == '/')
for p in pvs:
p.update()
self.assertTrue(sorted(tags_value) == p.Tags)
rc = self.handle_return(
vg.PvTagsDel(
dbus.Array(vg.Pvs, 'o'),
dbus.Array(tags_value, 's'),
dbus.Int32(g_tmo),
EOD))
self.assertEqual(rc, '/')
for p in pvs:
p.update()
self.assertTrue([] == p.Tags)
def test_vg_tags(self):
vg = self._vg_create().Vg
t = ['Testing', 'tags']
self.handle_return(
vg.TagsAdd(
dbus.Array(t, 's'),
dbus.Int32(g_tmo),
EOD))
vg.update()
self.assertTrue(t == vg.Tags)
self.handle_return(
vg.TagsDel(
dbus.Array(t, 's'),
dbus.Int32(g_tmo),
EOD))
vg.update()
self.assertTrue([] == vg.Tags)
def _test_lv_tags(self, lv):
t = ['Testing', 'tags']
self.handle_return(
lv.Lv.TagsAdd(
dbus.Array(t, 's'), dbus.Int32(g_tmo), EOD))
self._check_consistency()
lv.update()
self.assertTrue(t == lv.LvCommon.Tags)
self.handle_return(
lv.Lv.TagsDel(
dbus.Array(t, 's'),
dbus.Int32(g_tmo),
EOD))
self._check_consistency()
lv.update()
self.assertTrue([] == lv.LvCommon.Tags)
def test_lv_tags(self):
vg = self._vg_create().Vg
lv = self._create_lv(vg=vg)
self._test_lv_tags(lv)
def test_vg_allocation_policy_set(self):
vg = self._vg_create().Vg
for p in ['anywhere', 'contiguous', 'cling', 'normal']:
rc = self.handle_return(
vg.AllocationPolicySet(
dbus.String(p), dbus.Int32(g_tmo), EOD))
self.assertEqual(rc, '/')
vg.update()
prop = getattr(vg, 'Alloc' + p.title())
self.assertTrue(prop)
def test_vg_max_pv(self):
vg = self._vg_create([self.objs[PV_INT][0].object_path]).Vg
for p in [0, 1, 10, 100, 100, 1024, 2 ** 32 - 1]:
rc = self.handle_return(
vg.MaxPvSet(
dbus.UInt64(p), dbus.Int32(g_tmo), EOD))
self.assertEqual(rc, '/')
vg.update()
self.assertTrue(
vg.MaxPv == p,
"Expected %s != Actual %s" % (str(p), str(vg.MaxPv)))
def test_vg_max_lv(self):
vg = self._vg_create().Vg
for p in [0, 1, 10, 100, 100, 1024, 2 ** 32 - 1]:
rc = self.handle_return(
vg.MaxLvSet(
dbus.UInt64(p), dbus.Int32(g_tmo), EOD))
self.assertEqual(rc, '/')
vg.update()
self.assertTrue(
vg.MaxLv == p,
"Expected %s != Actual %s" % (str(p), str(vg.MaxLv)))
def test_vg_uuid_gen(self):
vg = self._vg_create().Vg
prev_uuid = vg.Uuid
rc = self.handle_return(
vg.UuidGenerate(
dbus.Int32(g_tmo),
EOD))
self.assertEqual(rc, '/')
vg.update()
self.assertTrue(
vg.Uuid != prev_uuid,
"Expected %s != Actual %s" % (vg.Uuid, prev_uuid))
def test_vg_activate_deactivate(self):
vg = self._vg_create().Vg
self._create_lv(vg=vg)
vg.update()
rc = self.handle_return(
vg.Deactivate(
dbus.UInt64(0), dbus.Int32(g_tmo), EOD))
self.assertEqual(rc, '/')
self._check_consistency()
rc = self.handle_return(
vg.Activate(
dbus.UInt64(0), dbus.Int32(g_tmo), EOD))
self.assertEqual(rc, '/')
self._check_consistency()
# Try control flags
for i in range(0, 5):
self.handle_return(
vg.Activate(
dbus.UInt64(1 << i),
dbus.Int32(g_tmo),
EOD))
def test_pv_resize(self):
self.assertTrue(len(self.objs[PV_INT]) > 0)
if len(self.objs[PV_INT]) > 0:
pv = ClientProxy(
self.bus, self.objs[PV_INT][0].object_path,
interfaces=(PV_INT, )).Pv
original_size = pv.SizeBytes
new_size = original_size // 2
self.handle_return(
pv.ReSize(
dbus.UInt64(new_size),
dbus.Int32(g_tmo),
EOD))
self._check_consistency()
pv.update()
self.assertTrue(pv.SizeBytes != original_size)
self.handle_return(
pv.ReSize(
dbus.UInt64(0),
dbus.Int32(g_tmo),
EOD))
self._check_consistency()
pv.update()
self.assertTrue(pv.SizeBytes == original_size)
def test_pv_allocation(self):
vg = self._vg_create(self._all_pv_object_paths()).Vg
pv = ClientProxy(self.bus, vg.Pvs[0], interfaces=(PV_INT, )).Pv
self.handle_return(
pv.AllocationEnabled(
dbus.Boolean(False),
dbus.Int32(g_tmo),
EOD))
pv.update()
self.assertFalse(pv.Allocatable)
self.handle_return(
pv.AllocationEnabled(
dbus.Boolean(True),
dbus.Int32(g_tmo),
EOD))
self.handle_return(
pv.AllocationEnabled(
dbus.Boolean(True),
dbus.Int32(g_tmo),
EOD))
pv.update()
self.assertTrue(pv.Allocatable)
self._check_consistency()
@staticmethod
def _get_devices():
context = pyudev.Context()
return context.list_devices(subsystem='block', MAJOR='8')
def _pv_scan(self, activate, cache, device_paths, major_minors):
mgr = self._manager().Manager
return self.handle_return(
mgr.PvScan(
dbus.Boolean(activate),
dbus.Boolean(cache),
dbus.Array(device_paths, 's'),
dbus.Array(major_minors, '(ii)'),
dbus.Int32(g_tmo),
EOD))
def test_pv_scan(self):
def major_minor(d):
return (int(d.properties['MAJOR']), int(d.properties['MINOR']))
devices = TestDbusService._get_devices()
self.assertEqual(self._pv_scan(False, True, [], []), '/')
self._check_consistency()
self.assertEqual(self._pv_scan(False, False, [], []), '/')
self._check_consistency()
block_path = [d.properties['DEVNAME'] for d in devices]
self.assertEqual(self._pv_scan(False, True, block_path, []), '/')
self._check_consistency()
mm = [major_minor(d) for d in devices]
self.assertEqual(self._pv_scan(False, True, block_path, mm), '/')
self._check_consistency()
self.assertEqual(self._pv_scan(False, True, [], mm), '/')
self._check_consistency()
@staticmethod
def _write_some_data(device_path, size):
blocks = int(size // 512)
block = bytearray(512)
for i in range(0, 512):
block[i] = i % 255
with open(device_path, mode='wb') as lv:
for i in range(0, blocks):
lv.write(block)
def test_snapshot_merge(self):
# Create a non-thin LV and merge it
ss_size = mib(8)
lv_p = self._create_lv(size=mib(16))
ss_name = lv_p.LvCommon.Name + '_snap'
snapshot_path = self.handle_return(
lv_p.Lv.Snapshot(
dbus.String(ss_name),
dbus.UInt64(ss_size),
dbus.Int32(g_tmo),
EOD))
intf = (LV_COMMON_INT, LV_INT, SNAPSHOT_INT, )
ss = ClientProxy(self.bus, snapshot_path, interfaces=intf)
# Write some data to snapshot so merge takes some time
TestDbusService._write_some_data(ss.LvCommon.Path, ss_size // 2)
job_path = self.handle_return(
ss.Snapshot.Merge(
dbus.Int32(g_tmo),
EOD))
self.assertEqual(job_path, '/')
def test_snapshot_merge_thin(self):
# Create a thin LV, snapshot it and merge it
_vg, _thin_path, lv_p = self._create_thin_lv()
ss_name = lv_p.LvCommon.Name + '_snap'
snapshot_path = self.handle_return(
lv_p.Lv.Snapshot(
dbus.String(ss_name),
dbus.UInt64(0),
dbus.Int32(g_tmo),
EOD))
intf = (LV_INT, LV_COMMON_INT, SNAPSHOT_INT)
ss = ClientProxy(self.bus, snapshot_path, interfaces=intf)
job_path = self.handle_return(
ss.Snapshot.Merge(
dbus.Int32(g_tmo), EOD)
)
self.assertTrue(job_path == '/')
def _create_cache_pool(self, vg=None):
if not vg:
vg = self._vg_create().Vg
md = self._create_lv(size=(mib(8)), vg=vg)
data = self._create_lv(size=(mib(8)), vg=vg)
cache_pool_path = self.handle_return(
vg.CreateCachePool(
dbus.ObjectPath(md.object_path),
dbus.ObjectPath(data.object_path),
dbus.Int32(g_tmo),
EOD))
intf = (CACHE_POOL_INT, )
cp = ClientProxy(self.bus, cache_pool_path, interfaces=intf)
return vg, cp
def test_cache_pool_create(self):
vg, cache_pool = self._create_cache_pool()
self.assertTrue(
'/com/redhat/lvmdbus1/CachePool' in cache_pool.object_path)
def _create_cache_lv(self, return_all=False):
vg, cache_pool = self._create_cache_pool()
lv_to_cache = self._create_lv(size=mib(32), vg=vg)
c_lv_path = self.handle_return(
cache_pool.CachePool.CacheLv(
dbus.ObjectPath(lv_to_cache.object_path),
dbus.Int32(g_tmo),
EOD))
intf = (LV_COMMON_INT, LV_INT, CACHE_LV_INT)
cached_lv = ClientProxy(self.bus, c_lv_path, interfaces=intf)
if return_all:
return vg, cache_pool, cached_lv
return cached_lv
def test_cache_lv_create(self):
for destroy_cache in [True, False]:
vg, _, cached_lv = self._create_cache_lv(True)
uncached_lv_path = self.handle_return(
cached_lv.CachedLv.DetachCachePool(
dbus.Boolean(destroy_cache),
dbus.Int32(g_tmo),
EOD))
self.assertTrue(
'/com/redhat/lvmdbus1/Lv' in uncached_lv_path)
rc = self.handle_return(
vg.Remove(dbus.Int32(g_tmo), EOD))
self.assertTrue(rc == '/')
def test_cache_lv_rename(self):
"""
Make sure that if we rename a cache lv that we correctly handle the
internal state update.
:return:
"""
def verify_cache_lv_count():
cur_objs, _ = get_objects()
self.assertEqual(len(cur_objs[CACHE_LV_INT]), 2)
self._check_consistency()
cached_lv = self._create_cache_lv()
verify_cache_lv_count()
new_name = 'renamed_' + cached_lv.LvCommon.Name
self.handle_return(
cached_lv.Lv.Rename(dbus.String(new_name), dbus.Int32(g_tmo), EOD))
verify_cache_lv_count()
def test_writecache_lv(self):
vg = self._vg_create().Vg
data_lv = self._create_lv(size=mib(16), vg=vg)
cache_lv = self._create_lv(size=mib(16), vg=vg)
# both LVs need to be inactive
self.handle_return(data_lv.Lv.Deactivate(
dbus.UInt64(0), dbus.Int32(g_tmo), EOD))
data_lv.update()
self.handle_return(cache_lv.Lv.Deactivate(
dbus.UInt64(0), dbus.Int32(g_tmo), EOD))
cache_lv.update()
cached_lv_path = self.handle_return(
cache_lv.Lv.WriteCacheLv(
dbus.ObjectPath(data_lv.object_path),
dbus.Int32(g_tmo),
EOD))
intf = (LV_COMMON_INT, LV_INT, CACHE_LV_INT)
cached_lv = ClientProxy(self.bus, cached_lv_path, interfaces=intf)
self.assertEqual(cached_lv.LvCommon.SegType, ["writecache"])
uncached_lv_path = self.handle_return(
cached_lv.CachedLv.DetachCachePool(
dbus.Boolean(True),
dbus.Int32(g_tmo),
EOD))
self.assertTrue('/com/redhat/lvmdbus1/Lv' in uncached_lv_path)
def test_vg_change(self):
vg_proxy = self._vg_create()
result = self.handle_return(vg_proxy.Vg.Change(
dbus.Int32(g_tmo),
dbus.Dictionary({'-a': 'ay'}, 'sv')))
self.assertTrue(result == '/')
result = self.handle_return(
vg_proxy.Vg.Change(
dbus.Int32(g_tmo),
dbus.Dictionary({'-a': 'n'}, 'sv')))
self.assertTrue(result == '/')
@staticmethod
def _invalid_vg_lv_name_characters():
bad_vg_lv_set = set(string.printable) - \
set(string.ascii_letters + string.digits + '.-_+')
return ''.join(bad_vg_lv_set)
def test_invalid_names(self):
mgr = self.objs[MANAGER_INT][0].Manager
# Pv device path
with self.assertRaises(dbus.exceptions.DBusException):
self.handle_return(
mgr.PvCreate(
dbus.String("/dev/space in name"),
dbus.Int32(g_tmo),
EOD))
# VG Name testing...
# Go through all bad characters
pv_paths = [self.objs[PV_INT][0].object_path]
bad_chars = TestDbusService._invalid_vg_lv_name_characters()
for c in bad_chars:
with self.assertRaises(dbus.exceptions.DBusException):
self.handle_return(
mgr.VgCreate(
dbus.String("name%s" % (c)),
dbus.Array(pv_paths, 'o'),
dbus.Int32(g_tmo),
EOD))
# Bad names
for bad in [".", ".."]:
with self.assertRaises(dbus.exceptions.DBusException):
self.handle_return(
mgr.VgCreate(
dbus.String(bad),
dbus.Array(pv_paths, 'o'),
dbus.Int32(g_tmo),
EOD))
# Exceed name length
for i in [128, 1024, 4096]:
with self.assertRaises(dbus.exceptions.DBusException):
self.handle_return(
mgr.VgCreate(
dbus.String('T' * i),
dbus.Array(pv_paths, 'o'),
dbus.Int32(g_tmo),
EOD))
# Create a VG and try to create LVs with different bad names
vg_name = vg_n()
vg_path = self.handle_return(
mgr.VgCreate(
dbus.String(vg_name),
dbus.Array(pv_paths, 'o'),
dbus.Int32(g_tmo),
EOD))
self._validate_lookup(vg_name, vg_path)
vg_proxy = ClientProxy(self.bus, vg_path, interfaces=(VG_INT, ))
for c in bad_chars:
with self.assertRaises(dbus.exceptions.DBusException):
self.handle_return(
vg_proxy.Vg.LvCreateLinear(
dbus.String(lv_n() + c),
dbus.UInt64(mib(4)),
dbus.Boolean(False),
dbus.Int32(g_tmo),
EOD))
for reserved in (
"_cdata", "_cmeta", "_corig", "_mimage", "_mlog",
"_pmspare", "_rimage", "_rmeta", "_tdata", "_tmeta",
"_vorigin", "_vdata"):
with self.assertRaises(dbus.exceptions.DBusException):
self.handle_return(
vg_proxy.Vg.LvCreateLinear(
dbus.String(lv_n() + reserved),
dbus.UInt64(mib(4)),
dbus.Boolean(False),
dbus.Int32(g_tmo),
EOD))
for reserved in ("snapshot", "pvmove"):
with self.assertRaises(dbus.exceptions.DBusException):
self.handle_return(
vg_proxy.Vg.LvCreateLinear(
dbus.String(reserved + lv_n()),
dbus.UInt64(mib(4)),
dbus.Boolean(False),
dbus.Int32(g_tmo),
EOD))
_ALLOWABLE_TAG_CH = string.ascii_letters + string.digits + "._-+/=!:&#"
def _invalid_tag_characters(self):
bad_tag_ch_set = set(string.printable) - set(self._ALLOWABLE_TAG_CH)
return ''.join(bad_tag_ch_set)
def test_invalid_tags(self):
vg_proxy = self._vg_create()
for c in self._invalid_tag_characters():
with self.assertRaises(dbus.exceptions.DBusException):
self.handle_return(
vg_proxy.Vg.TagsAdd(
dbus.Array([c], 's'),
dbus.Int32(g_tmo),
EOD))
for c in self._invalid_tag_characters():
with self.assertRaises(dbus.exceptions.DBusException):
self.handle_return(
vg_proxy.Vg.TagsAdd(
dbus.Array(["a%sb" % (c)], 's'),
dbus.Int32(g_tmo),
EOD))
def _tag_add_common(self, vg_proxy, tag):
tmp = self.handle_return(
vg_proxy.Vg.TagsAdd(
dbus.Array([tag], 's'),
dbus.Int32(g_tmo),
EOD))
self.assertTrue(tmp == '/')
vg_proxy.update()
self.assertTrue(
tag in vg_proxy.Vg.Tags,
"%s not in %s" % (tag, str(vg_proxy.Vg.Tags)))
def test_tag_names(self):
vg_proxy = self._vg_create()
for i in range(1, 64):
tag = rs(i, "", self._ALLOWABLE_TAG_CH)
self._tag_add_common(vg_proxy, tag)
self.assertEqual(
i, len(vg_proxy.Vg.Tags),
"%d != %d" % (i, len(vg_proxy.Vg.Tags)))
def test_tag_regression(self):
vg_proxy = self._vg_create()
tag = '--h/K.6g0A4FOEatf3+k_nI/Yp&L_u2oy-=j649x:+dUcYWPEo6.IWT0c'
self._tag_add_common(vg_proxy, tag)
def _verify_existence(self, cmd, operation, resource_name):
ec, stdout, stderr = call_lvm(cmd)
if ec == 0:
path = self._lookup(resource_name)
self.assertTrue(path != '/')
else:
std_err_print(
"%s failed with stdout= %s, stderr= %s" %
(operation, stdout, stderr))
self.assertTrue(ec == 0, "%s exit code = %d" % (operation, ec))
def test_external_vg_create(self):
# We need to ensure that if a user creates something outside lvm
# dbus service that things are sequenced correctly so that if a dbus
# user calls into the service they will find the same information.
vg_name = vg_n()
# Get all the PV device paths
pv_device_paths = [p.Pv.Name for p in self.objs[PV_INT]]
cmd = ['vgcreate', vg_name]
cmd.extend(pv_device_paths)
self._verify_existence(cmd, cmd[0], vg_name)
def test_external_lv_create(self):
# Let's create a LV outside of service and see if we correctly handle
# its inclusion
vg = self._vg_create().Vg
lv_name = lv_n()
full_name = "%s/%s" % (vg.Name, lv_name)
cmd = ['lvcreate', '-L4M', '-n', lv_name, vg.Name]
self._verify_existence(cmd, cmd[0], full_name)
def test_external_pv_create(self):
# Let's create a PV outside of service and see if we correctly handle
# its inclusion
target = self.objs[PV_INT][0]
# Remove the PV
rc = self._pv_remove(target)
self.assertTrue(rc == '/')
self._check_consistency()
# Make sure the PV we removed no longer exists
self.assertTrue(self._lookup(target.Pv.Name) == '/')
# Add it back with external command line
cmd = ['pvcreate', target.Pv.Name]
self._verify_existence(cmd, cmd[0], target.Pv.Name)
def _create_nested(self, pv_object_path):
vg = self._vg_create([pv_object_path])
pv = ClientProxy(self.bus, pv_object_path, interfaces=(PV_INT,))
self.assertEqual(pv.Pv.Vg, vg.object_path)
self.assertIn(
pv_object_path, vg.Vg.Pvs, "Expecting PV object path in Vg.Pvs")
lv = self._create_lv(
vg=vg.Vg, size=vg.Vg.FreeBytes, suffix="_pv0")
device_path = '/dev/%s/%s' % (vg.Vg.Name, lv.LvCommon.Name)
new_pv_object_path = self._pv_create(device_path)
vg.update()
self.assertEqual(lv.LvCommon.Vg, vg.object_path)
self.assertIn(
lv.object_path, vg.Vg.Lvs, "Expecting LV object path in Vg.Lvs")
new_pv_proxy = ClientProxy(
self.bus, new_pv_object_path, interfaces=(PV_INT, ))
self.assertEqual(new_pv_proxy.Pv.Name, device_path)
return new_pv_object_path
def test_nesting(self):
# check to see if we handle an LV becoming a PV which has it's own
# LV
#
# NOTE: This needs an equivalent of aux extend_filter_LVMTEST
# when run from lvm2 testsuite. See dbustest.sh.
# Also, if developing locally with actual devices one can achieve this
# by editing lvm.conf with "devices/scan_lvs = 1" As testing
# typically utilizes loopback, this test is skipped in
# those environments.
if dm_dev_dir != '/dev':
raise unittest.SkipTest('test not running in real /dev')
pv_object_path = self.objs[PV_INT][0].object_path
if not self.objs[PV_INT][0].Pv.Name.startswith("/dev"):
raise unittest.SkipTest('test not running in /dev')
for i in range(0, 5):
pv_object_path = self._create_nested(pv_object_path)
def test_pv_symlinks(self):
# Let's take one of our test PVs, pvremove it, find a symlink to it
# and re-create using the symlink to ensure we return an object
# path to it. Additionally, we will take the symlink and do a lookup
# (Manager.LookUpByLvmId) using it and the original device path to
# ensure that we can find the PV.
symlink = None
pv = self.objs[PV_INT][0]
pv_device_path = pv.Pv.Name
if dm_dev_dir != '/dev':
raise unittest.SkipTest('test not running in real /dev')
if not pv_device_path.startswith("/dev"):
raise unittest.SkipTest('test not running in /dev')
self._pv_remove(pv)
# Make sure we no longer find the pv
rc = self._lookup(pv_device_path)
self.assertEqual(rc, '/')
# Let's locate a symlink for it
devices = glob('/dev/disk/*/*')
rp_pv_device_path = os.path.realpath(pv_device_path)
for d in devices:
if rp_pv_device_path == os.path.realpath(d):
symlink = d
break
self.assertIsNotNone(symlink, "We expected to find at least 1 symlink!")
# Make sure symlink look up fails too
rc = self._lookup(symlink)
self.assertEqual(rc, '/')
### pv_object_path = self._pv_create(symlink)
### Test is limited by filter rules and must use /dev/mapper/LVMTEST path
pv_object_path = self._pv_create(pv_device_path)
self.assertNotEqual(pv_object_path, '/')
pv_proxy = ClientProxy(self.bus, pv_object_path, interfaces=(PV_INT, ))
self.assertEqual(pv_proxy.Pv.Name, pv_device_path)
# Lets check symlink lookup
self.assertEqual(pv_object_path, self._lookup(pv_device_path))
def _create_vdo_pool_and_lv(self, vg_prefix="vdo_"):
pool_name = lv_n("_vdo_pool")
lv_name = lv_n()
vg_proxy = self._vg_create(vg_prefix=vg_prefix)
vdo_pool_object_path = self.handle_return(
vg_proxy.VgVdo.CreateVdoPoolandLv(
pool_name, lv_name,
dbus.UInt64(VDO_MIN_SIZE),
dbus.UInt64(VDO_MIN_SIZE * 2),
dbus.Int32(g_tmo),
EOD))
self.assertNotEqual(vdo_pool_object_path, "/")
self.assertEqual(
vdo_pool_object_path,
self._lookup("%s/%s" % (vg_proxy.Vg.Name, pool_name)))
vdo_pool_path = self._lookup("%s/%s" % (vg_proxy.Vg.Name, pool_name))
self.assertNotEqual(vdo_pool_path, "/")
intf = [LV_COMMON_INT, LV_INT]
vdo_lv_obj_path = self._lookup("%s/%s" % (vg_proxy.Vg.Name, lv_name))
vdo_lv = ClientProxy(self.bus, vdo_lv_obj_path, interfaces=intf)
intf.append(VDOPOOL_INT)
vdo_pool_lv = ClientProxy(self.bus, vdo_pool_path, interfaces=intf)
return vg_proxy, vdo_pool_lv, vdo_lv
def _create_vdo_lv(self):
return self._create_vdo_pool_and_lv()[2]
def _vdo_pool_lv(self):
return self._create_vdo_pool_and_lv()[1]
def test_vdo_pool_create(self):
# Basic vdo sanity testing
if not self.vdo:
raise unittest.SkipTest('vdo not supported')
# Do this twice to ensure we are providing the correct flags to force
# the operation when it finds an existing vdo signature, which likely
# shouldn't exist.
for _ in range(0, 2):
vg, _, _ = self._create_vdo_pool_and_lv()
self.handle_return(vg.Vg.Remove(dbus.Int32(g_tmo), EOD))
def _create_vdo_pool(self):
pool_name = lv_n('_vdo_pool')
lv_name = lv_n('_vdo_data')
vg_proxy = self._vg_create(vg_prefix="vdo_conv_")
lv = self._test_lv_create(
vg_proxy.Vg.LvCreate,
(dbus.String(pool_name), dbus.UInt64(VDO_MIN_SIZE),
dbus.Array([], signature='(ott)'), dbus.Int32(g_tmo),
EOD), vg_proxy.Vg, LV_BASE_INT)
lv_obj_path = self._lookup("%s/%s" % (vg_proxy.Vg.Name, pool_name))
self.assertNotEqual(lv_obj_path, "/")
vdo_pool_path = self.handle_return(
vg_proxy.VgVdo.CreateVdoPool(
dbus.ObjectPath(lv.object_path), lv_name,
dbus.UInt64(VDO_MIN_SIZE),
dbus.Int32(g_tmo),
EOD))
self.assertNotEqual(vdo_pool_path, "/")
self.assertEqual(
vdo_pool_path,
self._lookup("%s/%s" % (vg_proxy.Vg.Name, pool_name)))
intf = [LV_COMMON_INT, LV_INT]
vdo_lv_obj_path = self._lookup("%s/%s" % (vg_proxy.Vg.Name, lv_name))
vdo_lv = ClientProxy(self.bus, vdo_lv_obj_path, interfaces=intf)
intf.append(VDOPOOL_INT)
vdo_pool_lv = ClientProxy(self.bus, vdo_pool_path, interfaces=intf)
return vg_proxy, vdo_pool_lv, vdo_lv
def test_vdo_pool_convert(self):
# Basic vdo sanity testing
if not self.vdo:
raise unittest.SkipTest('vdo not supported')
vg, _pool, _lv = self._create_vdo_pool()
self.handle_return(vg.Vg.Remove(dbus.Int32(g_tmo), EOD))
def test_vdo_pool_compression_deduplication(self):
if not self.vdo:
raise unittest.SkipTest('vdo not supported')
vg, pool, _lv = self._create_vdo_pool_and_lv(vg_prefix="vdo2_")
# compression and deduplication should be enabled by default
self.assertEqual(pool.VdoPool.Compression, "enabled")
self.assertEqual(pool.VdoPool.Deduplication, "enabled")
self.handle_return(
pool.VdoPool.DisableCompression(dbus.Int32(g_tmo), EOD))
self.handle_return(
pool.VdoPool.DisableDeduplication(dbus.Int32(g_tmo), EOD))
pool.update()
self.assertEqual(pool.VdoPool.Compression, "")
self.assertEqual(pool.VdoPool.Deduplication, "")
self.handle_return(
pool.VdoPool.EnableCompression(dbus.Int32(g_tmo), EOD))
self.handle_return(
pool.VdoPool.EnableDeduplication(dbus.Int32(g_tmo), EOD))
pool.update()
self.assertEqual(pool.VdoPool.Compression, "enabled")
self.assertEqual(pool.VdoPool.Deduplication, "enabled")
self.handle_return(vg.Vg.Remove(dbus.Int32(g_tmo), EOD))
def _test_lv_method_interface(self, lv):
self._rename_lv_test(lv)
self._test_activate_deactivate(lv)
self._test_lv_tags(lv)
self._test_lv_resize(lv)
def _test_lv_method_interface_sequence(
self, lv, test_ss=True, remove_lv=True):
self._test_lv_method_interface(lv)
# We can't take a snapshot of a pool lv (not yet).
if test_ss:
ss_lv = self._take_lv_snapshot(lv)
self._test_lv_method_interface(ss_lv)
self._lv_remove(ss_lv)
if remove_lv:
self._lv_remove(lv)
def test_lv_interface_plain_lv(self):
self._test_lv_method_interface_sequence(self._create_lv())
def test_lv_interface_vdo_lv(self):
if not self.vdo:
raise unittest.SkipTest('vdo not supported')
self._test_lv_method_interface_sequence(self._create_vdo_lv())
def test_lv_interface_cache_lv(self):
self._test_lv_method_interface_sequence(
self._create_cache_lv(), remove_lv=False)
def test_lv_interface_thin_pool_lv(self):
self._test_lv_method_interface_sequence(
self._create_thin_pool_lv(), test_ss=False)
def test_lv_interface_vdo_pool_lv(self):
if not self.vdo:
raise unittest.SkipTest('vdo not supported')
self._test_lv_method_interface_sequence(
self._vdo_pool_lv(), test_ss=False)
class AggregateResults(object):
def __init__(self):
self.no_errors = True
def register_result(self, result):
if not result.result.wasSuccessful():
self.no_errors = False
def register_fail(self):
self.no_errors = False
def exit_run(self):
if self.no_errors:
sys.exit(0)
sys.exit(1)
if __name__ == '__main__':
r = AggregateResults()
mode = int(test_shell)
if mode == 0:
std_err_print('\n*** Testing only lvm fork & exec test mode ***\n')
elif mode == 1:
std_err_print('\n*** Testing only lvm shell mode ***\n')
elif mode == 2:
std_err_print('\n*** Testing fork & exec & lvm shell mode ***\n')
else:
std_err_print("Unsupported \"LVM_DBUSD_TEST_MODE\"=%d, [0-2] valid" % mode)
sys.exit(1)
for g_tmo in [0, 15]:
std_err_print('Testing TMO=%d\n' % g_tmo)
if mode == 0:
if set_execution(False, r):
r.register_result(unittest.main(exit=False))
elif mode == 1:
if set_execution(True, r):
r.register_result(unittest.main(exit=False))
else:
if set_execution(False, r):
r.register_result(unittest.main(exit=False))
# Test lvm shell
if set_execution(True, r):
r.register_result(unittest.main(exit=False))
if not r.no_errors:
break
r.exit_run()