1
0
mirror of git://sourceware.org/git/lvm2.git synced 2025-01-03 05:18:29 +03:00
lvm2/test/dbus/lvmdbustest.py
Tony Asleson ea45ba753e lvmdbustest: Remove force exception in _wait_for_job
We put this in to test one of the paths in the damon, but unfortunately
if we hit the race condition where the job actually is done we will try
to call j.Wait(1) after the remove.  This fails with:

dbus.exceptions.DBusException: org.freedesktop.DBus.Error.UnknownMethod:
Method "Wait" with signature "i" on interface "com.redhat.lvmdbus1.Job"
doesn't exist

Which is caused by the dbus object no longer existing.  We could handle
this, but the issue is we no longer have the ability to get the result to
return, they have been lost.

A better solution would be to write a specific unit test to force this code
path and handle all the possible outcomes.
2022-09-16 10:49:36 -05:00

2129 lines
57 KiB
Python
Executable File

#!/usr/bin/python3
# Copyright (C) 2015-2016 Red Hat, Inc. All rights reserved.
#
# This copyrighted material is made available to anyone wishing to use,
# modify, copy, or redistribute it subject to the terms and conditions
# of the GNU General Public License v.2.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# noinspection PyUnresolvedReferences
import dbus
# noinspection PyUnresolvedReferences
from dbus.mainloop.glib import DBusGMainLoop
import unittest
import pyudev
from testlib import *
import testlib
from subprocess import Popen, PIPE
from glob import glob
import os
g_tmo = 0
# Approx. min size
VDO_MIN_SIZE = mib(8192)
# Prefix on created objects to enable easier clean-up
g_prefix = os.getenv('PREFIX', '')
# Check dev dir prefix for test suite (LVM_TEST_DEVDIR
dm_dev_dir = os.getenv('DM_DEV_DIR', '/dev')
# Use the session bus instead of the system bus
use_session = os.getenv('LVM_DBUSD_USE_SESSION', False)
# Only use the devices listed in the ENV variable
pv_device_list = os.getenv('LVM_DBUSD_PV_DEVICE_LIST', None)
# Default is to test all modes
# 0 == Only test fork & exec mode
# 1 == Only test lvm shell mode
# 2 == Test both fork & exec & lvm shell mode (default)
# Other == Test just lvm shell mode
test_shell = os.getenv('LVM_DBUSD_TEST_MODE', 2)
# LVM binary to use
LVM_EXECUTABLE = os.getenv('LVM_BINARY', '/usr/sbin/lvm')
# Empty options dictionary (EOD)
EOD = dbus.Dictionary({}, signature=dbus.Signature('sv'))
# Base interfaces on LV objects
LV_BASE_INT = (LV_COMMON_INT, LV_INT)
if use_session:
bus = dbus.SessionBus(mainloop=DBusGMainLoop())
else:
bus = dbus.SystemBus(mainloop=DBusGMainLoop())
# If we have multiple clients we will globally disable introspection
# validation to limit the massive amount of introspection calls we make as
# that method prevents things from executing concurrently
if pv_device_list:
testlib.validate_introspection = False
def vg_n(prefix=None):
name = rs(8, '_vg')
if prefix:
name = prefix + name
return g_prefix + name
def lv_n(suffix=None):
if not suffix:
s = '_lv'
else:
s = suffix
return rs(8, s)
def _is_testsuite_pv(pv_name):
return g_prefix != "" and pv_name[-1].isdigit() and \
pv_name[:-1].endswith(g_prefix + "pv")
def is_nested_pv(pv_name):
return pv_name.count('/') == 3 and not _is_testsuite_pv(pv_name)
def _root_pv_name(res, pv_name):
if not is_nested_pv(pv_name):
return pv_name
vg_name = pv_name.split('/')[2]
for v in res[VG_INT]:
if v.Vg.Name == vg_name:
for pv in res[PV_INT]:
if pv.object_path in v.Vg.Pvs:
return _root_pv_name(res, pv.Pv.Name)
return None
def _prune_lvs(res, interface, vg_object_path):
lvs = [lv for lv in res[interface] if lv.LvCommon.Vg == vg_object_path]
res[interface] = lvs
def _prune(res, pv_filter):
if pv_filter:
pv_lookup = {}
pv_list = []
for p in res[PV_INT]:
if _root_pv_name(res, p.Pv.Name) in pv_filter:
pv_list.append(p)
pv_lookup[p.object_path] = p
res[PV_INT] = pv_list
vg_list = []
for v in res[VG_INT]:
if v.Vg.Pvs[0] in pv_lookup:
vg_list.append(v)
for interface in \
[LV_INT, THINPOOL_INT, LV_COMMON_INT,
CACHE_POOL_INT, CACHE_LV_INT, VDOPOOL_INT]:
_prune_lvs(res, interface, v.object_path)
res[VG_INT] = vg_list
return res
def get_objects():
rc = {
MANAGER_INT: [], PV_INT: [], VG_INT: [], LV_INT: [],
THINPOOL_INT: [], JOB_INT: [], SNAPSHOT_INT: [], LV_COMMON_INT: [],
CACHE_POOL_INT: [], CACHE_LV_INT: [], VG_VDO_INT: [], VDOPOOL_INT: []}
object_manager_object = bus.get_object(
BUS_NAME, "/com/redhat/lvmdbus1", introspect=False)
manager_interface = dbus.Interface(
object_manager_object, "org.freedesktop.DBus.ObjectManager")
objects = manager_interface.GetManagedObjects()
for object_path, v in objects.items():
proxy = ClientProxy(bus, object_path, v)
for interface in v.keys():
rc[interface].append(proxy)
# At this point we have a full population of everything, we now need to
# prune the objects if we are filtering PVs with a sub selection.
return _prune(rc, pv_device_list), bus
def set_execution(lvmshell, test_result):
if lvmshell:
m = 'lvm shell (non-fork)'
else:
m = "forking & exec'ing"
lvm_manager = dbus.Interface(bus.get_object(
BUS_NAME, "/com/redhat/lvmdbus1/Manager", introspect=False),
"com.redhat.lvmdbus1.Manager")
rc = lvm_manager.UseLvmShell(lvmshell)
if rc:
std_err_print('Successfully changed execution mode to "%s"' % m)
else:
std_err_print('ERROR: Failed to change execution mode to "%s"' % m)
test_result.register_fail()
return rc
def call_lvm(command):
"""
Call lvm executable and return a tuple of exitcode, stdout, stderr
:param command: Command to execute
:type command: list
:returns (exitcode, stdout, stderr)
:rtype (int, str, str)
"""
# Prepend the full lvm executable so that we can run different versions
# in different locations on the same box
command.insert(0, LVM_EXECUTABLE)
process = Popen(
command, stdout=PIPE, stderr=PIPE, close_fds=True, env=os.environ)
out = process.communicate()
stdout_text = bytes(out[0]).decode("utf-8")
stderr_text = bytes(out[1]).decode("utf-8")
return process.returncode, stdout_text, stderr_text
def supports_vdo():
cmd = ['segtypes']
modprobe = Popen(["modprobe", "kvdo"], stdout=PIPE, stderr=PIPE, close_fds=True, env=os.environ)
modprobe.communicate()
if modprobe.returncode != 0:
return False
rc, out, err = call_lvm(cmd)
if rc != 0 or "vdo" not in out:
return False
return True
# noinspection PyUnresolvedReferences
class TestDbusService(unittest.TestCase):
def setUp(self):
# Because of the sensitive nature of running LVM tests we will only
# run if we have PVs and nothing else, so that we can be confident that
# we are not mucking with someones data on their system
self.objs, self.bus = get_objects()
if len(self.objs[PV_INT]) == 0:
std_err_print('No PVs present exiting!')
sys.exit(1)
if len(self.objs[MANAGER_INT]) != 1:
std_err_print('Expecting a manager object!')
sys.exit(1)
if len(self.objs[VG_INT]) != 0:
std_err_print('Expecting no VGs to exist!')
sys.exit(1)
self.pvs = []
for p in self.objs[PV_INT]:
self.pvs.append(p.Pv.Name)
self.vdo = supports_vdo()
def _recurse_vg_delete(self, vg_proxy, pv_proxy, nested_pv_hash):
for pv_device_name, t in nested_pv_hash.items():
vg_name = str(vg_proxy.Vg.Name)
if vg_name in pv_device_name:
self._recurse_vg_delete(t[0], t[1], nested_pv_hash)
break
vg_proxy.update()
self.handle_return(vg_proxy.Vg.Remove(dbus.Int32(g_tmo), EOD))
if is_nested_pv(pv_proxy.Pv.Name):
rc = self._pv_remove(pv_proxy)
self.assertTrue(rc == '/')
def tearDown(self):
# If we get here it means we passed setUp, so lets remove anything
# and everything that remains, besides the PVs themselves
self.objs, self.bus = get_objects()
# The self.objs[PV_INT] list only contains those which we should be
# mucking with, lets remove any embedded/nested PVs first, then proceed
# to walk the base PVs and remove the VGs
nested_pvs = {}
non_nested = []
for p in self.objs[PV_INT]:
if is_nested_pv(p.Pv.Name):
if p.Pv.Vg != '/':
v = ClientProxy(self.bus, p.Pv.Vg, interfaces=(VG_INT,))
nested_pvs[p.Pv.Name] = (v, p)
else:
# Nested PV with no VG, so just simply remove it!
self._pv_remove(p)
else:
non_nested.append(p)
for p in non_nested:
# When we remove a VG for a PV it could ripple across multiple
# PVs, so update each PV while removing each VG, to ensure
# the properties are current and correct.
p.update()
if p.Pv.Vg != '/':
v = ClientProxy(self.bus, p.Pv.Vg, interfaces=(VG_INT, ))
self._recurse_vg_delete(v, p, nested_pvs)
# Check to make sure the PVs we had to start exist, else re-create
# them
self.objs, self.bus = get_objects()
if len(self.pvs) != len(self.objs[PV_INT]):
for p in self.pvs:
found = False
for pc in self.objs[PV_INT]:
if pc.Pv.Name == p:
found = True
break
if not found:
# print('Re-creating PV=', p)
self._pv_create(p)
def _check_consistency(self):
# Only do consistency checks if we aren't running the unit tests
# concurrently
if pv_device_list is None:
self.assertEqual(self._refresh(), 0)
def handle_return(self, rc):
if isinstance(rc, (tuple, list)):
# We have a tuple returned
if rc[0] != '/':
return rc[0]
else:
return self._wait_for_job(rc[1])
else:
if rc == '/':
return rc
else:
return self._wait_for_job(rc)
def _pv_create(self, device):
pv_path = self.handle_return(
self.objs[MANAGER_INT][0].Manager.PvCreate(
dbus.String(device), dbus.Int32(g_tmo), EOD)
)
self._validate_lookup(device, pv_path)
self.assertTrue(pv_path is not None and len(pv_path) > 0)
return pv_path
def _manager(self):
return self.objs[MANAGER_INT][0]
def _refresh(self):
return self._manager().Manager.Refresh()
def test_refresh(self):
self._check_consistency()
def test_version(self):
rc = self.objs[MANAGER_INT][0].Manager.Version
self.assertTrue(rc is not None and len(rc) > 0)
self._check_consistency()
def _vg_create(self, pv_paths=None, vg_prefix=None):
if not pv_paths:
pv_paths = self._all_pv_object_paths()
vg_name = vg_n(prefix=vg_prefix)
vg_path = self.handle_return(
self.objs[MANAGER_INT][0].Manager.VgCreate(
dbus.String(vg_name),
dbus.Array(pv_paths, signature=dbus.Signature('o')),
dbus.Int32(g_tmo),
EOD))
self._validate_lookup(vg_name, vg_path)
self.assertTrue(vg_path is not None and len(vg_path) > 0)
intf = [VG_INT, ]
if self.vdo:
intf.append(VG_VDO_INT)
return ClientProxy(self.bus, vg_path, interfaces=intf)
def test_vg_create(self):
self._vg_create()
self._check_consistency()
def test_vg_delete(self):
vg = self._vg_create().Vg
self.handle_return(
vg.Remove(dbus.Int32(g_tmo), EOD))
self._check_consistency()
def _pv_remove(self, pv):
rc = self.handle_return(
pv.Pv.Remove(dbus.Int32(g_tmo), EOD))
return rc
def test_pv_remove_add(self):
target = self.objs[PV_INT][0]
# Remove the PV
rc = self._pv_remove(target)
self.assertTrue(rc == '/')
self._check_consistency()
# Add it back
rc = self._pv_create(target.Pv.Name)[0]
self.assertTrue(rc == '/')
self._check_consistency()
def _create_raid5_thin_pool(self, vg=None):
meta_name = "meta_r5"
data_name = "data_r5"
if not vg:
vg = self._vg_create(self._all_pv_object_paths()).Vg
lv_meta_path = self.handle_return(
vg.LvCreateRaid(
dbus.String(meta_name),
dbus.String("raid5"),
dbus.UInt64(mib(4)),
dbus.UInt32(0),
dbus.UInt32(0),
dbus.Int32(g_tmo),
EOD)
)
self._validate_lookup("%s/%s" % (vg.Name, meta_name), lv_meta_path)
lv_data_path = self.handle_return(
vg.LvCreateRaid(
dbus.String(data_name),
dbus.String("raid5"),
dbus.UInt64(mib(16)),
dbus.UInt32(0),
dbus.UInt32(0),
dbus.Int32(g_tmo),
EOD)
)
self._validate_lookup("%s/%s" % (vg.Name, data_name), lv_data_path)
thin_pool_path = self.handle_return(
vg.CreateThinPool(
dbus.ObjectPath(lv_meta_path),
dbus.ObjectPath(lv_data_path),
dbus.Int32(g_tmo), EOD)
)
# Get thin pool client proxy
intf = (LV_COMMON_INT, LV_INT, THINPOOL_INT)
thin_pool = ClientProxy(self.bus, thin_pool_path, interfaces=intf)
return vg, thin_pool
def test_meta_lv_data_lv_props(self):
# Ensure that metadata lv and data lv for thin pools and cache pools
# point to a valid LV
(vg, thin_pool) = self._create_raid5_thin_pool()
# Check properties on thin pool
self.assertTrue(thin_pool.ThinPool.DataLv != '/')
self.assertTrue(thin_pool.ThinPool.MetaDataLv != '/')
(vg, cache_pool) = self._create_cache_pool(vg)
self.assertTrue(cache_pool.CachePool.DataLv != '/')
self.assertTrue(cache_pool.CachePool.MetaDataLv != '/')
# Cache the thin pool
cached_thin_pool_path = self.handle_return(
cache_pool.CachePool.CacheLv(
dbus.ObjectPath(thin_pool.object_path),
dbus.Int32(g_tmo), EOD)
)
# Get object proxy for cached thin pool
intf = (LV_COMMON_INT, LV_INT, THINPOOL_INT)
cached_thin_pool_object = ClientProxy(
self.bus, cached_thin_pool_path, interfaces=intf)
# Check properties on cache pool
self.assertTrue(cached_thin_pool_object.ThinPool.DataLv != '/')
self.assertTrue(cached_thin_pool_object.ThinPool.MetaDataLv != '/')
def _lookup(self, lvm_id):
return self.objs[MANAGER_INT][0].\
Manager.LookUpByLvmId(dbus.String(lvm_id))
def _validate_lookup(self, lvm_name, object_path):
t = self._lookup(lvm_name)
self.assertTrue(
object_path == t, "%s != %s for %s" % (object_path, t, lvm_name))
def test_lookup_by_lvm_id(self):
# For the moment lets just lookup what we know about which is PVs
# When we start testing VGs and LVs we will test lookups for those
# during those unit tests
for p in self.objs[PV_INT]:
rc = self._lookup(p.Pv.Name)
self.assertTrue(rc is not None and rc != '/')
# Search for something which doesn't exist
rc = self._lookup('/dev/null')
self.assertTrue(rc == '/')
def test_vg_extend(self):
# Create a VG
self.assertTrue(len(self.objs[PV_INT]) >= 2)
if len(self.objs[PV_INT]) >= 2:
pv_initial = self.objs[PV_INT][0]
pv_next = self.objs[PV_INT][1]
vg = self._vg_create([pv_initial.object_path]).Vg
path = self.handle_return(
vg.Extend(
dbus.Array([pv_next.object_path], signature="o"),
dbus.Int32(g_tmo), EOD)
)
self.assertTrue(path == '/')
self._check_consistency()
# noinspection PyUnresolvedReferences
def test_vg_reduce(self):
self.assertTrue(len(self.objs[PV_INT]) >= 2)
if len(self.objs[PV_INT]) >= 2:
vg = self._vg_create(
[self.objs[PV_INT][0].object_path,
self.objs[PV_INT][1].object_path]).Vg
path = self.handle_return(
vg.Reduce(
dbus.Boolean(False), dbus.Array([vg.Pvs[0]], signature='o'),
dbus.Int32(g_tmo), EOD)
)
self.assertTrue(path == '/')
self._check_consistency()
def _verify_lv_paths(self, vg, new_name):
"""
# Go through each LV and make sure it has the correct path back to the
# VG
:return:
"""
lv_paths = vg.Lvs
for l in lv_paths:
lv_proxy = ClientProxy(
self.bus, l, interfaces=(LV_COMMON_INT,)).LvCommon
self.assertTrue(
lv_proxy.Vg == vg.object_path, "%s != %s" %
(lv_proxy.Vg, vg.object_path))
full_name = "%s/%s" % (new_name, lv_proxy.Name)
lv_path = self._lookup(full_name)
self.assertTrue(
lv_path == lv_proxy.object_path, "%s != %s" %
(lv_path, lv_proxy.object_path))
# noinspection PyUnresolvedReferences
def test_vg_rename(self):
vg = self._vg_create().Vg
# Do a vg lookup
path = self._lookup(vg.Name)
vg_name_start = vg.Name
prev_path = path
self.assertTrue(path != '/', "%s" % (path))
# Create some LVs in the VG
for i in range(0, 5):
lv_t = self._create_lv(size=mib(4), vg=vg)
full_name = "%s/%s" % (vg_name_start, lv_t.LvCommon.Name)
lv_path = self._lookup(full_name)
self.assertTrue(lv_path == lv_t.object_path)
new_name = 'renamed_' + vg.Name
path = self.handle_return(
vg.Rename(dbus.String(new_name), dbus.Int32(g_tmo), EOD))
self.assertTrue(path == '/')
self._check_consistency()
# Do a vg lookup
path = self._lookup(new_name)
self.assertTrue(path != '/', "%s" % (path))
self.assertTrue(prev_path == path, "%s != %s" % (prev_path, path))
# Go through each LV and make sure it has the correct path back to the
# VG
vg.update()
self.assertTrue(len(vg.Lvs) == 5)
self._verify_lv_paths(vg, new_name)
def _verify_hidden_lookups(self, lv_common_object, vgname):
hidden_lv_paths = lv_common_object.HiddenLvs
for h in hidden_lv_paths:
h_lv = ClientProxy(
self.bus, h, interfaces=(LV_COMMON_INT,)).LvCommon
if len(h_lv.HiddenLvs) > 0:
self._verify_hidden_lookups(h_lv, vgname)
full_name = "%s/%s" % (vgname, h_lv.Name)
# print("Hidden check %s" % (full_name))
lookup_path = self._lookup(full_name)
self.assertTrue(lookup_path != '/')
self.assertTrue(lookup_path == h_lv.object_path)
# Lets's strip off the '[ ]' and make sure we can find
full_name = "%s/%s" % (vgname, h_lv.Name[1:-1])
# print("Hidden check %s" % (full_name))
lookup_path = self._lookup(full_name)
self.assertTrue(lookup_path != '/')
self.assertTrue(lookup_path == h_lv.object_path)
def test_vg_rename_with_thin_pool(self):
(vg, thin_pool) = self._create_raid5_thin_pool()
vg_name_start = vg.Name
# noinspection PyTypeChecker
self._verify_hidden_lookups(thin_pool.LvCommon, vg_name_start)
for i in range(0, 5):
lv_name = lv_n()
thin_lv_path = self.handle_return(
thin_pool.ThinPool.LvCreate(
dbus.String(lv_name),
dbus.UInt64(mib(16)),
dbus.Int32(g_tmo),
EOD))
self._validate_lookup(
"%s/%s" % (vg_name_start, lv_name), thin_lv_path)
self.assertTrue(thin_lv_path != '/')
full_name = "%s/%s" % (vg_name_start, lv_name)
lookup_lv_path = self._lookup(full_name)
self.assertTrue(
thin_lv_path == lookup_lv_path,
"%s != %s" % (thin_lv_path, lookup_lv_path))
# Rename the VG
new_name = 'renamed_' + vg.Name
path = self.handle_return(
vg.Rename(dbus.String(new_name), dbus.Int32(g_tmo), EOD))
self.assertTrue(path == '/')
self._check_consistency()
vg.update()
thin_pool.update()
self._verify_lv_paths(vg, new_name)
# noinspection PyTypeChecker
self._verify_hidden_lookups(thin_pool.LvCommon, new_name)
def _test_lv_create(self, method, params, vg, proxy_interfaces=None):
lv = None
path = self.handle_return(method(*params))
self.assertTrue(vg)
if path:
lv = ClientProxy(self.bus, path, interfaces=proxy_interfaces)
# We are quick enough now that we can get VolumeType changes from
# 'I' to 'i' between the time it takes to create a RAID and it returns
# and when we refresh state here. Not sure how we can handle this as
# we cannot just sit and poll all the time for changes...
# self._check_consistency()
return lv
def test_lv_create(self):
lv_name = lv_n()
vg = self._vg_create().Vg
lv = self._test_lv_create(
vg.LvCreate,
(dbus.String(lv_name), dbus.UInt64(mib(4)),
dbus.Array([], signature='(ott)'), dbus.Int32(g_tmo),
EOD), vg, LV_BASE_INT)
self._validate_lookup("%s/%s" % (vg.Name, lv_name), lv.object_path)
def test_prop_get(self):
lv_name = lv_n()
vg = self._vg_create().Vg
lv = self._test_lv_create(
vg.LvCreate,
(dbus.String(lv_name), dbus.UInt64(mib(4)),
dbus.Array([], signature='(ott)'), dbus.Int32(g_tmo),
EOD), vg, LV_BASE_INT)
ri = RemoteInterface(lv.dbus_object, interface=LV_COMMON_INT, introspect=False)
ri.update()
for prop_name in ri.get_property_names():
self.assertEqual(ri.get_property_value(prop_name), getattr(ri, prop_name))
def test_lv_create_job(self):
lv_name = lv_n()
vg = self._vg_create().Vg
(object_path, job_path) = vg.LvCreate(
dbus.String(lv_name), dbus.UInt64(mib(4)),
dbus.Array([], signature='(ott)'), dbus.Int32(0),
EOD)
self.assertTrue(object_path == '/')
self.assertTrue(job_path != '/')
object_path = self._wait_for_job(job_path)
self._validate_lookup("%s/%s" % (vg.Name, lv_name), object_path)
self.assertTrue(object_path != '/')
def test_lv_create_linear(self):
lv_name = lv_n()
vg = self._vg_create().Vg
lv = self._test_lv_create(
vg.LvCreateLinear,
(dbus.String(lv_name), dbus.UInt64(mib(4)), dbus.Boolean(False),
dbus.Int32(g_tmo), EOD), vg, LV_BASE_INT)
self._validate_lookup("%s/%s" % (vg.Name, lv_name), lv.object_path)
def _all_pv_object_paths(self):
return [pp.object_path for pp in self.objs[PV_INT]]
def test_lv_create_striped(self):
lv_name = lv_n()
vg = self._vg_create(self._all_pv_object_paths()).Vg
lv = self._test_lv_create(
vg.LvCreateStriped,
(dbus.String(lv_name), dbus.UInt64(mib(4)),
dbus.UInt32(2), dbus.UInt32(8), dbus.Boolean(False),
dbus.Int32(g_tmo), EOD), vg, LV_BASE_INT)
self._validate_lookup("%s/%s" % (vg.Name, lv_name), lv.object_path)
def test_lv_create_mirror(self):
lv_name = lv_n()
vg = self._vg_create(self._all_pv_object_paths()).Vg
lv = self._test_lv_create(
vg.LvCreateMirror,
(dbus.String(lv_name), dbus.UInt64(mib(4)), dbus.UInt32(2),
dbus.Int32(g_tmo), EOD), vg, LV_BASE_INT)
self._validate_lookup("%s/%s" % (vg.Name, lv_name), lv.object_path)
def test_lv_create_raid(self):
lv_name = lv_n()
vg = self._vg_create(self._all_pv_object_paths()).Vg
lv = self._test_lv_create(
vg.LvCreateRaid,
(dbus.String(lv_name), dbus.String('raid5'), dbus.UInt64(mib(16)),
dbus.UInt32(2), dbus.UInt32(8), dbus.Int32(g_tmo), EOD),
vg, LV_BASE_INT)
self._validate_lookup("%s/%s" % (vg.Name, lv_name), lv.object_path)
def _create_lv(self, thinpool=False, size=None, vg=None, suffix=None):
lv_name = lv_n(suffix=suffix)
interfaces = list(LV_BASE_INT)
if thinpool:
interfaces.append(THINPOOL_INT)
if not vg:
vg = self._vg_create(self._all_pv_object_paths()).Vg
if size is None:
size = mib(8)
lv = self._test_lv_create(
vg.LvCreateLinear,
(dbus.String(lv_name), dbus.UInt64(size),
dbus.Boolean(thinpool), dbus.Int32(g_tmo), EOD),
vg, interfaces)
self._validate_lookup("%s/%s" % (vg.Name, lv_name), lv.object_path)
return lv
def _create_thin_pool_lv(self):
return self._create_lv(True)
def test_lv_create_rounding(self):
self._create_lv(size=(mib(2) + 13))
def test_lv_create_thin_pool(self):
self._create_thin_pool_lv()
def _rename_lv_test(self, lv):
path = self._lookup(lv.LvCommon.Name)
prev_path = path
new_name = 'renamed_' + lv.LvCommon.Name
self.handle_return(
lv.Lv.Rename(dbus.String(new_name), dbus.Int32(g_tmo), EOD))
path = self._lookup(new_name)
self._check_consistency()
self.assertTrue(prev_path == path, "%s != %s" % (prev_path, path))
lv.update()
self.assertTrue(
lv.LvCommon.Name == new_name,
"%s != %s" % (lv.LvCommon.Name, new_name))
def test_lv_rename(self):
# Rename a regular LV
lv = self._create_lv()
self._rename_lv_test(lv)
def test_lv_thinpool_rename(self):
# Rename a thin pool
tp = self._create_lv(True)
self.assertTrue(
THINPOOL_LV_PATH in tp.object_path,
"%s" % (tp.object_path))
new_name = 'renamed_' + tp.LvCommon.Name
self.handle_return(tp.Lv.Rename(
dbus.String(new_name), dbus.Int32(g_tmo), EOD))
tp.update()
self._check_consistency()
self.assertEqual(new_name, tp.LvCommon.Name)
def _create_thin_lv(self):
vg = self._vg_create().Vg
tp = self._create_lv(thinpool=True, vg=vg)
lv_name = lv_n('_thin_lv')
thin_path = self.handle_return(
tp.ThinPool.LvCreate(
dbus.String(lv_name),
dbus.UInt64(mib(10)),
dbus.Int32(g_tmo),
EOD)
)
self._validate_lookup("%s/%s" % (vg.Name, lv_name), thin_path)
lv = ClientProxy(
self.bus, thin_path, interfaces=(LV_COMMON_INT, LV_INT))
return vg, thin_path, lv
# noinspection PyUnresolvedReferences
def test_lv_on_thin_pool_rename(self):
# Rename a LV on a thin Pool
vg, thin_path, lv = self._create_thin_lv()
re_named = 'rename_test' + lv.LvCommon.Name
rc = self.handle_return(
lv.Lv.Rename(
dbus.String(re_named),
dbus.Int32(g_tmo),
EOD)
)
self._validate_lookup("%s/%s" % (vg.Name, re_named), thin_path)
self.assertTrue(rc == '/')
self._check_consistency()
def _lv_remove(self, lv):
rc = self.handle_return(
lv.Lv.Remove(
dbus.Int32(g_tmo),
EOD))
self.assertTrue(rc == '/')
self._check_consistency()
def test_lv_remove(self):
lv = self._create_lv()
self._lv_remove(lv)
def _take_lv_snapshot(self, lv_p):
ss_name = 'ss_' + lv_p.LvCommon.Name
ss_obj_path = self.handle_return(lv_p.Lv.Snapshot(
dbus.String(ss_name),
dbus.UInt64(0),
dbus.Int32(g_tmo),
EOD))
self.assertTrue(ss_obj_path != '/')
return ClientProxy(
self.bus, ss_obj_path, interfaces=(LV_COMMON_INT, LV_INT))
def test_lv_snapshot(self):
lv_p = self._create_lv()
self._take_lv_snapshot(lv_p)
# noinspection PyUnresolvedReferences,PyUnusedLocal
def _wait_for_job(self, j_path):
rc = None
j = ClientProxy(self.bus, j_path, interfaces=(JOB_INT, )).Job
while True:
j.update()
if j.Complete:
(ec, error_msg) = j.GetError
self.assertTrue(ec == 0, "%d :%s" % (ec, error_msg))
if ec == 0:
self.assertTrue(j.Percent == 100, "P= %f" % j.Percent)
rc = j.Result
j.Remove()
break
if j.Wait(1):
self.assertTrue(j.Wait(0))
j.update()
self.assertTrue(j.Complete)
return rc
def test_lv_create_pv_specific(self):
vg = self._vg_create().Vg
lv_name = lv_n()
pv = vg.Pvs
pvp = ClientProxy(self.bus, pv[0], interfaces=(PV_INT,))
lv = self._test_lv_create(
vg.LvCreate, (
dbus.String(lv_name),
dbus.UInt64(mib(4)),
dbus.Array(
[[pvp.object_path, 0, (pvp.Pv.PeCount - 1)]],
signature='(ott)'),
dbus.Int32(g_tmo), EOD), vg, LV_BASE_INT)
self._validate_lookup("%s/%s" % (vg.Name, lv_name), lv.object_path)
def _test_lv_resize(self, lv):
# Can't resize cache or thin pool volumes or vdo pool lv
if lv.LvCommon.Attr[0] == 'C' or lv.LvCommon.Attr[0] == 't' or \
lv.LvCommon.Attr[0] == 'd':
return
vg = ClientProxy(self.bus, lv.LvCommon.Vg, interfaces=(VG_INT,)).Vg
start_size = lv.LvCommon.SizeBytes
# Vdo are fairly big and need large re-size amounts.
if start_size > mib(4) * 3:
delta = mib(4)
else:
delta = 16384
for size in [start_size + delta, start_size - delta]:
pv_in_use = [i[0] for i in lv.LvCommon.Devices]
# Select a PV in the VG that isn't in use
pv_empty = [p for p in vg.Pvs if p not in pv_in_use]
prev = lv.LvCommon.SizeBytes
if len(pv_empty):
p = ClientProxy(self.bus, pv_empty[0], interfaces=(PV_INT,))
rc = self.handle_return(
lv.Lv.Resize(
dbus.UInt64(size),
dbus.Array(
[[p.object_path, 0, p.Pv.PeCount - 1]], '(oii)'),
dbus.Int32(g_tmo), EOD))
else:
rc = self.handle_return(
lv.Lv.Resize(
dbus.UInt64(size),
dbus.Array([], '(oii)'),
dbus.Int32(g_tmo), EOD))
self.assertEqual(rc, '/')
self._check_consistency()
lv.update()
if prev < size:
self.assertTrue(lv.LvCommon.SizeBytes > prev)
else:
# We are testing re-sizing to same size too...
self.assertTrue(lv.LvCommon.SizeBytes <= prev)
def test_lv_resize(self):
pv_paths = [
self.objs[PV_INT][0].object_path, self.objs[PV_INT][1].object_path]
vg = self._vg_create(pv_paths).Vg
lv = self._create_lv(vg=vg, size=mib(16))
self._test_lv_resize(lv)
def test_lv_resize_same(self):
vg = self._vg_create(self._all_pv_object_paths()).Vg
lv = self._create_lv(vg=vg)
with self.assertRaises(dbus.exceptions.DBusException):
lv.Lv.Resize(
dbus.UInt64(lv.LvCommon.SizeBytes),
dbus.Array([], '(oii)'),
dbus.Int32(-1), EOD)
def test_lv_move(self):
lv = self._create_lv()
pv_path_move = str(lv.LvCommon.Devices[0][0])
# Test moving a specific LV
rc = self.handle_return(
lv.Lv.Move(
dbus.ObjectPath(pv_path_move),
dbus.Struct((0, 0), signature='(tt)'),
dbus.Array([], '(ott)'), dbus.Int32(g_tmo),
EOD))
self.assertTrue(rc == '/')
self._check_consistency()
lv.update()
new_pv = str(lv.LvCommon.Devices[0][0])
self.assertTrue(
pv_path_move != new_pv, "%s == %s" % (pv_path_move, new_pv))
def _test_activate_deactivate(self, lv_p):
self.handle_return(lv_p.Lv.Deactivate(
dbus.UInt64(0), dbus.Int32(g_tmo), EOD))
lv_p.update()
self.assertFalse(lv_p.LvCommon.Active)
self._check_consistency()
self.handle_return(lv_p.Lv.Activate(
dbus.UInt64(0), dbus.Int32(g_tmo), EOD))
lv_p.update()
self.assertTrue(lv_p.LvCommon.Active)
# Vdo property "IndexState" when getting activated goes from
# "opening" -> "online" after we have returned from the activate call
# thus when we try to check the consistency we fail as the property
# is changing on it's own and not because the lvmdbusd failed to
# refresh it's own state. One solution is to not expose IndexState as
# a property.
# TODO Expose method to determine if Lv is partaking in VDO.
vg = ClientProxy(self.bus, lv_p.LvCommon.Vg, interfaces=(VG_INT,))
if "vdo" not in vg.Vg.Name:
self._check_consistency()
# Try control flags
for i in range(0, 5):
self.handle_return(lv_p.Lv.Activate(
dbus.UInt64(1 << i),
dbus.Int32(g_tmo),
EOD))
self.assertTrue(lv_p.LvCommon.Active)
self._check_consistency()
def test_lv_activate_deactivate(self):
lv_p = self._create_lv()
self._test_activate_deactivate(lv_p)
def test_move(self):
lv = self._create_lv()
# Test moving without being LV specific
vg = ClientProxy(self.bus, lv.LvCommon.Vg, interfaces=(VG_INT, )).Vg
pv_to_move = str(lv.LvCommon.Devices[0][0])
rc = self.handle_return(
vg.Move(
dbus.ObjectPath(pv_to_move),
dbus.Struct((0, 0), signature='tt'),
dbus.Array([], '(ott)'),
dbus.Int32(0),
EOD))
self.assertEqual(rc, '/')
self._check_consistency()
vg.update()
lv.update()
location = lv.LvCommon.Devices[0][0]
dst = None
for p in vg.Pvs:
if p != location:
dst = p
# Fetch the destination
pv = ClientProxy(self.bus, dst, interfaces=(PV_INT, )).Pv
# Test range, move it to the middle of the new destination
job = self.handle_return(
vg.Move(
dbus.ObjectPath(location),
dbus.Struct((0, 0), signature='tt'),
dbus.Array([(dst, pv.PeCount // 2, 0), ], '(ott)'),
dbus.Int32(g_tmo),
EOD))
self.assertEqual(job, '/')
self._check_consistency()
def test_job_handling(self):
pv_paths = self._all_pv_object_paths()
vg_name = vg_n()
# Test getting a job right away
vg_path, vg_job = self.objs[MANAGER_INT][0].Manager.VgCreate(
dbus.String(vg_name),
dbus.Array(pv_paths, 'o'),
dbus.Int32(0),
EOD)
self.assertTrue(vg_path == '/')
self.assertTrue(vg_job and len(vg_job) > 0)
vg_path = self._wait_for_job(vg_job)
self._validate_lookup(vg_name, vg_path)
def _test_expired_timer(self, num_lvs):
rc = False
# In small configurations lvm is pretty snappy, so let's create a VG
# add a number of LVs and then remove the VG and all the contained
# LVs which appears to consistently run a little slow.
vg_proxy = self._vg_create(self._all_pv_object_paths())
for i in range(0, num_lvs):
lv_name = lv_n()
vg_proxy.update()
if vg_proxy.Vg.FreeCount > 0:
lv_path = self.handle_return(
vg_proxy.Vg.LvCreateLinear(
dbus.String(lv_name),
dbus.UInt64(mib(4)),
dbus.Boolean(False),
dbus.Int32(g_tmo),
EOD))
self.assertTrue(lv_path != '/')
self._validate_lookup(
"%s/%s" % (vg_proxy.Vg.Name, lv_name), lv_path)
else:
# We ran out of space, test will probably fail
break
# Make sure that we are honoring the timeout
start = time.time()
remove_job = vg_proxy.Vg.Remove(dbus.Int32(1), EOD)
end = time.time()
tt_remove = float(end) - float(start)
self.assertTrue(tt_remove < 2.0, "remove time %s" % (str(tt_remove)))
# Depending on how long it took we could finish either way
if remove_job != '/':
# We got a job
result = self._wait_for_job(remove_job)
self.assertTrue(result == '/')
rc = True
else:
# It completed before timer popped
pass
return rc
# noinspection PyUnusedLocal
def test_job_handling_timer(self):
yes = False
for pp in self.objs[PV_INT]:
if '/dev/sd' not in pp.Pv.Name:
std_err_print("Skipping test_job_handling_timer on loopback")
return
# This may not pass
for i in [128, 256]:
yes = self._test_expired_timer(i)
if yes:
break
std_err_print('Attempt (%d) failed, trying again...' % (i))
self.assertTrue(yes)
def test_pv_tags(self):
pvs = []
vg = self._vg_create(self._all_pv_object_paths()).Vg
# Get the PVs
for p in vg.Pvs:
pvs.append(ClientProxy(self.bus, p, interfaces=(PV_INT, )).Pv)
for tags_value in [['hello'], ['foo', 'bar']]:
rc = self.handle_return(
vg.PvTagsAdd(
dbus.Array(vg.Pvs, 'o'),
dbus.Array(tags_value, 's'),
dbus.Int32(g_tmo),
EOD))
self.assertTrue(rc == '/')
for p in pvs:
p.update()
self.assertTrue(sorted(tags_value) == p.Tags)
rc = self.handle_return(
vg.PvTagsDel(
dbus.Array(vg.Pvs, 'o'),
dbus.Array(tags_value, 's'),
dbus.Int32(g_tmo),
EOD))
self.assertEqual(rc, '/')
for p in pvs:
p.update()
self.assertTrue([] == p.Tags)
def test_vg_tags(self):
vg = self._vg_create().Vg
t = ['Testing', 'tags']
self.handle_return(
vg.TagsAdd(
dbus.Array(t, 's'),
dbus.Int32(g_tmo),
EOD))
vg.update()
self.assertTrue(t == vg.Tags)
self.handle_return(
vg.TagsDel(
dbus.Array(t, 's'),
dbus.Int32(g_tmo),
EOD))
vg.update()
self.assertTrue([] == vg.Tags)
def _test_lv_tags(self, lv):
t = ['Testing', 'tags']
self.handle_return(
lv.Lv.TagsAdd(
dbus.Array(t, 's'), dbus.Int32(g_tmo), EOD))
self._check_consistency()
lv.update()
self.assertTrue(t == lv.LvCommon.Tags)
self.handle_return(
lv.Lv.TagsDel(
dbus.Array(t, 's'),
dbus.Int32(g_tmo),
EOD))
self._check_consistency()
lv.update()
self.assertTrue([] == lv.LvCommon.Tags)
def test_lv_tags(self):
vg = self._vg_create().Vg
lv = self._create_lv(vg=vg)
self._test_lv_tags(lv)
def test_vg_allocation_policy_set(self):
vg = self._vg_create().Vg
for p in ['anywhere', 'contiguous', 'cling', 'normal']:
rc = self.handle_return(
vg.AllocationPolicySet(
dbus.String(p), dbus.Int32(g_tmo), EOD))
self.assertEqual(rc, '/')
vg.update()
prop = getattr(vg, 'Alloc' + p.title())
self.assertTrue(prop)
def test_vg_max_pv(self):
vg = self._vg_create([self.objs[PV_INT][0].object_path]).Vg
for p in [0, 1, 10, 100, 100, 1024, 2 ** 32 - 1]:
rc = self.handle_return(
vg.MaxPvSet(
dbus.UInt64(p), dbus.Int32(g_tmo), EOD))
self.assertEqual(rc, '/')
vg.update()
self.assertTrue(
vg.MaxPv == p,
"Expected %s != Actual %s" % (str(p), str(vg.MaxPv)))
def test_vg_max_lv(self):
vg = self._vg_create().Vg
for p in [0, 1, 10, 100, 100, 1024, 2 ** 32 - 1]:
rc = self.handle_return(
vg.MaxLvSet(
dbus.UInt64(p), dbus.Int32(g_tmo), EOD))
self.assertEqual(rc, '/')
vg.update()
self.assertTrue(
vg.MaxLv == p,
"Expected %s != Actual %s" % (str(p), str(vg.MaxLv)))
def test_vg_uuid_gen(self):
vg = self._vg_create().Vg
prev_uuid = vg.Uuid
rc = self.handle_return(
vg.UuidGenerate(
dbus.Int32(g_tmo),
EOD))
self.assertEqual(rc, '/')
vg.update()
self.assertTrue(
vg.Uuid != prev_uuid,
"Expected %s != Actual %s" % (vg.Uuid, prev_uuid))
def test_vg_activate_deactivate(self):
vg = self._vg_create().Vg
self._create_lv(vg=vg)
vg.update()
rc = self.handle_return(
vg.Deactivate(
dbus.UInt64(0), dbus.Int32(g_tmo), EOD))
self.assertEqual(rc, '/')
self._check_consistency()
rc = self.handle_return(
vg.Activate(
dbus.UInt64(0), dbus.Int32(g_tmo), EOD))
self.assertEqual(rc, '/')
self._check_consistency()
# Try control flags
for i in range(0, 5):
self.handle_return(
vg.Activate(
dbus.UInt64(1 << i),
dbus.Int32(g_tmo),
EOD))
def test_pv_resize(self):
self.assertTrue(len(self.objs[PV_INT]) > 0)
if len(self.objs[PV_INT]) > 0:
pv = ClientProxy(
self.bus, self.objs[PV_INT][0].object_path,
interfaces=(PV_INT, )).Pv
original_size = pv.SizeBytes
new_size = original_size // 2
self.handle_return(
pv.ReSize(
dbus.UInt64(new_size),
dbus.Int32(g_tmo),
EOD))
self._check_consistency()
pv.update()
self.assertTrue(pv.SizeBytes != original_size)
self.handle_return(
pv.ReSize(
dbus.UInt64(0),
dbus.Int32(g_tmo),
EOD))
self._check_consistency()
pv.update()
self.assertTrue(pv.SizeBytes == original_size)
def test_pv_allocation(self):
vg = self._vg_create(self._all_pv_object_paths()).Vg
pv = ClientProxy(self.bus, vg.Pvs[0], interfaces=(PV_INT, )).Pv
self.handle_return(
pv.AllocationEnabled(
dbus.Boolean(False),
dbus.Int32(g_tmo),
EOD))
pv.update()
self.assertFalse(pv.Allocatable)
self.handle_return(
pv.AllocationEnabled(
dbus.Boolean(True),
dbus.Int32(g_tmo),
EOD))
self.handle_return(
pv.AllocationEnabled(
dbus.Boolean(True),
dbus.Int32(g_tmo),
EOD))
pv.update()
self.assertTrue(pv.Allocatable)
self._check_consistency()
@staticmethod
def _get_devices():
context = pyudev.Context()
bd = context.list_devices(subsystem='block')
# Handle block extended major too (259)
return [b for b in bd if b.properties.get('MAJOR') == '8' or
b.properties.get('MAJOR') == '259']
def _pv_scan(self, activate, cache, device_paths, major_minors):
mgr = self._manager().Manager
return self.handle_return(
mgr.PvScan(
dbus.Boolean(activate),
dbus.Boolean(cache),
dbus.Array(device_paths, 's'),
dbus.Array(major_minors, '(ii)'),
dbus.Int32(g_tmo),
EOD))
def test_pv_scan(self):
def major_minor(d):
return (int(d.properties.get('MAJOR')), int(d.properties.get('MINOR')))
devices = TestDbusService._get_devices()
self.assertEqual(self._pv_scan(False, True, [], []), '/')
self._check_consistency()
self.assertEqual(self._pv_scan(False, False, [], []), '/')
self._check_consistency()
block_path = [d.properties.get('DEVNAME') for d in devices]
self.assertEqual(self._pv_scan(False, True, block_path, []), '/')
self._check_consistency()
mm = [major_minor(d) for d in devices]
self.assertEqual(self._pv_scan(False, True, block_path, mm), '/')
self._check_consistency()
self.assertEqual(self._pv_scan(False, True, [], mm), '/')
self._check_consistency()
@staticmethod
def _write_some_data(device_path, size):
blocks = int(size // 512)
block = bytearray(512)
for i in range(0, 512):
block[i] = i % 255
with open(device_path, mode='wb') as lv:
for i in range(0, blocks):
lv.write(block)
def test_snapshot_merge(self):
# Create a non-thin LV and merge it
ss_size = mib(8)
lv_p = self._create_lv(size=mib(16))
ss_name = lv_p.LvCommon.Name + '_snap'
snapshot_path = self.handle_return(
lv_p.Lv.Snapshot(
dbus.String(ss_name),
dbus.UInt64(ss_size),
dbus.Int32(g_tmo),
EOD))
intf = (LV_COMMON_INT, LV_INT, SNAPSHOT_INT, )
ss = ClientProxy(self.bus, snapshot_path, interfaces=intf)
# Write some data to snapshot so merge takes some time
TestDbusService._write_some_data(ss.LvCommon.Path, ss_size // 2)
job_path = self.handle_return(
ss.Snapshot.Merge(
dbus.Int32(g_tmo),
EOD))
self.assertEqual(job_path, '/')
def test_snapshot_merge_thin(self):
# Create a thin LV, snapshot it and merge it
_vg, _thin_path, lv_p = self._create_thin_lv()
ss_name = lv_p.LvCommon.Name + '_snap'
snapshot_path = self.handle_return(
lv_p.Lv.Snapshot(
dbus.String(ss_name),
dbus.UInt64(0),
dbus.Int32(g_tmo),
EOD))
intf = (LV_INT, LV_COMMON_INT, SNAPSHOT_INT)
ss = ClientProxy(self.bus, snapshot_path, interfaces=intf)
job_path = self.handle_return(
ss.Snapshot.Merge(
dbus.Int32(g_tmo), EOD)
)
self.assertTrue(job_path == '/')
def _create_cache_pool(self, vg=None):
if not vg:
vg = self._vg_create().Vg
md = self._create_lv(size=(mib(8)), vg=vg)
data = self._create_lv(size=(mib(8)), vg=vg)
cache_pool_path = self.handle_return(
vg.CreateCachePool(
dbus.ObjectPath(md.object_path),
dbus.ObjectPath(data.object_path),
dbus.Int32(g_tmo),
EOD))
intf = (CACHE_POOL_INT, )
cp = ClientProxy(self.bus, cache_pool_path, interfaces=intf)
return vg, cp
def test_cache_pool_create(self):
vg, cache_pool = self._create_cache_pool()
self.assertTrue(
'/com/redhat/lvmdbus1/CachePool' in cache_pool.object_path)
def _create_cache_lv(self, return_all=False):
vg, cache_pool = self._create_cache_pool()
lv_to_cache = self._create_lv(size=mib(32), vg=vg)
c_lv_path = self.handle_return(
cache_pool.CachePool.CacheLv(
dbus.ObjectPath(lv_to_cache.object_path),
dbus.Int32(g_tmo),
EOD))
intf = (LV_COMMON_INT, LV_INT, CACHE_LV_INT)
cached_lv = ClientProxy(self.bus, c_lv_path, interfaces=intf)
if return_all:
return vg, cache_pool, cached_lv
return cached_lv
def test_cache_lv_create(self):
for destroy_cache in [True, False]:
vg, _, cached_lv = self._create_cache_lv(True)
uncached_lv_path = self.handle_return(
cached_lv.CachedLv.DetachCachePool(
dbus.Boolean(destroy_cache),
dbus.Int32(g_tmo),
EOD))
self.assertTrue(
'/com/redhat/lvmdbus1/Lv' in uncached_lv_path)
rc = self.handle_return(
vg.Remove(dbus.Int32(g_tmo), EOD))
self.assertTrue(rc == '/')
def test_cache_lv_rename(self):
"""
Make sure that if we rename a cache lv that we correctly handle the
internal state update.
:return:
"""
def verify_cache_lv_count():
cur_objs, _ = get_objects()
self.assertEqual(len(cur_objs[CACHE_LV_INT]), 2)
self._check_consistency()
cached_lv = self._create_cache_lv()
verify_cache_lv_count()
new_name = 'renamed_' + cached_lv.LvCommon.Name
self.handle_return(
cached_lv.Lv.Rename(dbus.String(new_name), dbus.Int32(g_tmo), EOD))
verify_cache_lv_count()
def test_writecache_lv(self):
vg = self._vg_create().Vg
data_lv = self._create_lv(size=mib(16), vg=vg)
cache_lv = self._create_lv(size=mib(16), vg=vg)
# both LVs need to be inactive
self.handle_return(data_lv.Lv.Deactivate(
dbus.UInt64(0), dbus.Int32(g_tmo), EOD))
data_lv.update()
self.handle_return(cache_lv.Lv.Deactivate(
dbus.UInt64(0), dbus.Int32(g_tmo), EOD))
cache_lv.update()
cached_lv_path = self.handle_return(
cache_lv.Lv.WriteCacheLv(
dbus.ObjectPath(data_lv.object_path),
dbus.Int32(g_tmo),
EOD))
intf = (LV_COMMON_INT, LV_INT, CACHE_LV_INT)
cached_lv = ClientProxy(self.bus, cached_lv_path, interfaces=intf)
self.assertEqual(cached_lv.LvCommon.SegType, ["writecache"])
uncached_lv_path = self.handle_return(
cached_lv.CachedLv.DetachCachePool(
dbus.Boolean(True),
dbus.Int32(g_tmo),
EOD))
self.assertTrue('/com/redhat/lvmdbus1/Lv' in uncached_lv_path)
def test_vg_change(self):
vg_proxy = self._vg_create()
result = self.handle_return(vg_proxy.Vg.Change(
dbus.Int32(g_tmo),
dbus.Dictionary({'-a': 'ay'}, 'sv')))
self.assertTrue(result == '/')
result = self.handle_return(
vg_proxy.Vg.Change(
dbus.Int32(g_tmo),
dbus.Dictionary({'-a': 'n'}, 'sv')))
self.assertTrue(result == '/')
@staticmethod
def _invalid_vg_lv_name_characters():
bad_vg_lv_set = set(string.printable) - \
set(string.ascii_letters + string.digits + '.-_+')
return ''.join(bad_vg_lv_set)
def test_invalid_names(self):
mgr = self.objs[MANAGER_INT][0].Manager
# Pv device path
with self.assertRaises(dbus.exceptions.DBusException):
self.handle_return(
mgr.PvCreate(
dbus.String("/dev/space in name"),
dbus.Int32(g_tmo),
EOD))
# VG Name testing...
# Go through all bad characters
pv_paths = [self.objs[PV_INT][0].object_path]
bad_chars = TestDbusService._invalid_vg_lv_name_characters()
for c in bad_chars:
with self.assertRaises(dbus.exceptions.DBusException):
self.handle_return(
mgr.VgCreate(
dbus.String("name%s" % (c)),
dbus.Array(pv_paths, 'o'),
dbus.Int32(g_tmo),
EOD))
# Bad names
for bad in [".", ".."]:
with self.assertRaises(dbus.exceptions.DBusException):
self.handle_return(
mgr.VgCreate(
dbus.String(bad),
dbus.Array(pv_paths, 'o'),
dbus.Int32(g_tmo),
EOD))
# Exceed name length
for i in [128, 1024, 4096]:
with self.assertRaises(dbus.exceptions.DBusException):
self.handle_return(
mgr.VgCreate(
dbus.String('T' * i),
dbus.Array(pv_paths, 'o'),
dbus.Int32(g_tmo),
EOD))
# Create a VG and try to create LVs with different bad names
vg_name = vg_n()
vg_path = self.handle_return(
mgr.VgCreate(
dbus.String(vg_name),
dbus.Array(pv_paths, 'o'),
dbus.Int32(g_tmo),
EOD))
self._validate_lookup(vg_name, vg_path)
vg_proxy = ClientProxy(self.bus, vg_path, interfaces=(VG_INT, ))
for c in bad_chars:
with self.assertRaises(dbus.exceptions.DBusException):
self.handle_return(
vg_proxy.Vg.LvCreateLinear(
dbus.String(lv_n() + c),
dbus.UInt64(mib(4)),
dbus.Boolean(False),
dbus.Int32(g_tmo),
EOD))
for reserved in (
"_cdata", "_cmeta", "_corig", "_mimage", "_mlog",
"_pmspare", "_rimage", "_rmeta", "_tdata", "_tmeta",
"_vorigin", "_vdata"):
with self.assertRaises(dbus.exceptions.DBusException):
self.handle_return(
vg_proxy.Vg.LvCreateLinear(
dbus.String(lv_n() + reserved),
dbus.UInt64(mib(4)),
dbus.Boolean(False),
dbus.Int32(g_tmo),
EOD))
for reserved in ("snapshot", "pvmove"):
with self.assertRaises(dbus.exceptions.DBusException):
self.handle_return(
vg_proxy.Vg.LvCreateLinear(
dbus.String(reserved + lv_n()),
dbus.UInt64(mib(4)),
dbus.Boolean(False),
dbus.Int32(g_tmo),
EOD))
_ALLOWABLE_TAG_CH = string.ascii_letters + string.digits + "._-+/=!:&#"
def _invalid_tag_characters(self):
bad_tag_ch_set = set(string.printable) - set(self._ALLOWABLE_TAG_CH)
return ''.join(bad_tag_ch_set)
def test_invalid_tags(self):
vg_proxy = self._vg_create()
for c in self._invalid_tag_characters():
with self.assertRaises(dbus.exceptions.DBusException):
self.handle_return(
vg_proxy.Vg.TagsAdd(
dbus.Array([c], 's'),
dbus.Int32(g_tmo),
EOD))
for c in self._invalid_tag_characters():
with self.assertRaises(dbus.exceptions.DBusException):
self.handle_return(
vg_proxy.Vg.TagsAdd(
dbus.Array(["a%sb" % (c)], 's'),
dbus.Int32(g_tmo),
EOD))
def _tag_add_common(self, vg_proxy, tag):
tmp = self.handle_return(
vg_proxy.Vg.TagsAdd(
dbus.Array([tag], 's'),
dbus.Int32(g_tmo),
EOD))
self.assertTrue(tmp == '/')
vg_proxy.update()
self.assertTrue(
tag in vg_proxy.Vg.Tags,
"%s not in %s" % (tag, str(vg_proxy.Vg.Tags)))
def test_tag_names(self):
vg_proxy = self._vg_create()
for i in range(1, 64):
tag = rs(i, "", self._ALLOWABLE_TAG_CH)
self._tag_add_common(vg_proxy, tag)
self.assertEqual(
i, len(vg_proxy.Vg.Tags),
"%d != %d" % (i, len(vg_proxy.Vg.Tags)))
def test_tag_regression(self):
vg_proxy = self._vg_create()
tag = '--h/K.6g0A4FOEatf3+k_nI/Yp&L_u2oy-=j649x:+dUcYWPEo6.IWT0c'
self._tag_add_common(vg_proxy, tag)
def _verify_existence(self, cmd, operation, resource_name):
ec, stdout, stderr = call_lvm(cmd)
if ec == 0:
path = self._lookup(resource_name)
self.assertTrue(path != '/')
else:
std_err_print(
"%s failed with stdout= %s, stderr= %s" %
(operation, stdout, stderr))
self.assertTrue(ec == 0, "%s exit code = %d" % (operation, ec))
def test_external_vg_create(self):
# We need to ensure that if a user creates something outside lvm
# dbus service that things are sequenced correctly so that if a dbus
# user calls into the service they will find the same information.
vg_name = vg_n()
# Get all the PV device paths
pv_device_paths = [p.Pv.Name for p in self.objs[PV_INT]]
cmd = ['vgcreate', vg_name]
cmd.extend(pv_device_paths)
self._verify_existence(cmd, cmd[0], vg_name)
def test_external_lv_create(self):
# Let's create a LV outside of service and see if we correctly handle
# its inclusion
vg = self._vg_create().Vg
lv_name = lv_n()
full_name = "%s/%s" % (vg.Name, lv_name)
cmd = ['lvcreate', '-L4M', '-n', lv_name, vg.Name]
self._verify_existence(cmd, cmd[0], full_name)
def test_external_pv_create(self):
# Let's create a PV outside of service and see if we correctly handle
# its inclusion
target = self.objs[PV_INT][0]
# Remove the PV
rc = self._pv_remove(target)
self.assertTrue(rc == '/')
self._check_consistency()
# Make sure the PV we removed no longer exists
self.assertTrue(self._lookup(target.Pv.Name) == '/')
# Add it back with external command line
cmd = ['pvcreate', target.Pv.Name]
self._verify_existence(cmd, cmd[0], target.Pv.Name)
def _create_nested(self, pv_object_path):
vg = self._vg_create([pv_object_path])
pv = ClientProxy(self.bus, pv_object_path, interfaces=(PV_INT,))
self.assertEqual(pv.Pv.Vg, vg.object_path)
self.assertIn(
pv_object_path, vg.Vg.Pvs, "Expecting PV object path in Vg.Pvs")
lv = self._create_lv(
vg=vg.Vg, size=vg.Vg.FreeBytes, suffix="_pv0")
device_path = '/dev/%s/%s' % (vg.Vg.Name, lv.LvCommon.Name)
new_pv_object_path = self._pv_create(device_path)
vg.update()
self.assertEqual(lv.LvCommon.Vg, vg.object_path)
self.assertIn(
lv.object_path, vg.Vg.Lvs, "Expecting LV object path in Vg.Lvs")
new_pv_proxy = ClientProxy(
self.bus, new_pv_object_path, interfaces=(PV_INT, ))
self.assertEqual(new_pv_proxy.Pv.Name, device_path)
return new_pv_object_path
def test_nesting(self):
# check to see if we handle an LV becoming a PV which has it's own
# LV
#
# NOTE: This needs an equivalent of aux extend_filter_LVMTEST
# when run from lvm2 testsuite. See dbustest.sh.
# Also, if developing locally with actual devices one can achieve this
# by editing lvm.conf with "devices/scan_lvs = 1" As testing
# typically utilizes loopback, this test is skipped in
# those environments.
if dm_dev_dir != '/dev':
raise unittest.SkipTest('test not running in real /dev')
pv_object_path = self.objs[PV_INT][0].object_path
if not self.objs[PV_INT][0].Pv.Name.startswith("/dev"):
raise unittest.SkipTest('test not running in /dev')
for i in range(0, 5):
pv_object_path = self._create_nested(pv_object_path)
def test_pv_symlinks(self):
# Let's take one of our test PVs, pvremove it, find a symlink to it
# and re-create using the symlink to ensure we return an object
# path to it. Additionally, we will take the symlink and do a lookup
# (Manager.LookUpByLvmId) using it and the original device path to
# ensure that we can find the PV.
symlink = None
pv = self.objs[PV_INT][0]
pv_device_path = pv.Pv.Name
if dm_dev_dir != '/dev':
raise unittest.SkipTest('test not running in real /dev')
if not pv_device_path.startswith("/dev"):
raise unittest.SkipTest('test not running in /dev')
self._pv_remove(pv)
# Make sure we no longer find the pv
rc = self._lookup(pv_device_path)
self.assertEqual(rc, '/')
# Let's locate a symlink for it
devices = glob('/dev/disk/*/*')
rp_pv_device_path = os.path.realpath(pv_device_path)
for d in devices:
if rp_pv_device_path == os.path.realpath(d):
symlink = d
break
self.assertIsNotNone(symlink, "We expected to find at least 1 symlink!")
# Make sure symlink look up fails too
rc = self._lookup(symlink)
self.assertEqual(rc, '/')
### pv_object_path = self._pv_create(symlink)
### Test is limited by filter rules and must use /dev/mapper/LVMTEST path
pv_object_path = self._pv_create(pv_device_path)
self.assertNotEqual(pv_object_path, '/')
pv_proxy = ClientProxy(self.bus, pv_object_path, interfaces=(PV_INT, ))
self.assertEqual(pv_proxy.Pv.Name, pv_device_path)
# Lets check symlink lookup
self.assertEqual(pv_object_path, self._lookup(pv_device_path))
def _create_vdo_pool_and_lv(self, vg_prefix="vdo_"):
pool_name = lv_n("_vdo_pool")
lv_name = lv_n()
vg_proxy = self._vg_create(vg_prefix=vg_prefix)
vdo_pool_object_path = self.handle_return(
vg_proxy.VgVdo.CreateVdoPoolandLv(
pool_name, lv_name,
dbus.UInt64(VDO_MIN_SIZE),
dbus.UInt64(VDO_MIN_SIZE * 2),
dbus.Int32(g_tmo),
EOD))
self.assertNotEqual(vdo_pool_object_path, "/")
self.assertEqual(
vdo_pool_object_path,
self._lookup("%s/%s" % (vg_proxy.Vg.Name, pool_name)))
vdo_pool_path = self._lookup("%s/%s" % (vg_proxy.Vg.Name, pool_name))
self.assertNotEqual(vdo_pool_path, "/")
intf = [LV_COMMON_INT, LV_INT]
vdo_lv_obj_path = self._lookup("%s/%s" % (vg_proxy.Vg.Name, lv_name))
vdo_lv = ClientProxy(self.bus, vdo_lv_obj_path, interfaces=intf)
intf.append(VDOPOOL_INT)
vdo_pool_lv = ClientProxy(self.bus, vdo_pool_path, interfaces=intf)
return vg_proxy, vdo_pool_lv, vdo_lv
def _create_vdo_lv(self):
return self._create_vdo_pool_and_lv()[2]
def _vdo_pool_lv(self):
return self._create_vdo_pool_and_lv()[1]
def test_vdo_pool_create(self):
# Basic vdo sanity testing
if not self.vdo:
raise unittest.SkipTest('vdo not supported')
# Do this twice to ensure we are providing the correct flags to force
# the operation when it finds an existing vdo signature, which likely
# shouldn't exist.
for _ in range(0, 2):
vg, _, _ = self._create_vdo_pool_and_lv()
self.handle_return(vg.Vg.Remove(dbus.Int32(g_tmo), EOD))
def _create_vdo_pool(self):
pool_name = lv_n('_vdo_pool')
lv_name = lv_n('_vdo_data')
vg_proxy = self._vg_create(vg_prefix="vdo_conv_")
lv = self._test_lv_create(
vg_proxy.Vg.LvCreate,
(dbus.String(pool_name), dbus.UInt64(VDO_MIN_SIZE),
dbus.Array([], signature='(ott)'), dbus.Int32(g_tmo),
EOD), vg_proxy.Vg, LV_BASE_INT)
lv_obj_path = self._lookup("%s/%s" % (vg_proxy.Vg.Name, pool_name))
self.assertNotEqual(lv_obj_path, "/")
vdo_pool_path = self.handle_return(
vg_proxy.VgVdo.CreateVdoPool(
dbus.ObjectPath(lv.object_path), lv_name,
dbus.UInt64(VDO_MIN_SIZE),
dbus.Int32(g_tmo),
EOD))
self.assertNotEqual(vdo_pool_path, "/")
self.assertEqual(
vdo_pool_path,
self._lookup("%s/%s" % (vg_proxy.Vg.Name, pool_name)))
intf = [LV_COMMON_INT, LV_INT]
vdo_lv_obj_path = self._lookup("%s/%s" % (vg_proxy.Vg.Name, lv_name))
vdo_lv = ClientProxy(self.bus, vdo_lv_obj_path, interfaces=intf)
intf.append(VDOPOOL_INT)
vdo_pool_lv = ClientProxy(self.bus, vdo_pool_path, interfaces=intf)
return vg_proxy, vdo_pool_lv, vdo_lv
def test_vdo_pool_convert(self):
# Basic vdo sanity testing
if not self.vdo:
raise unittest.SkipTest('vdo not supported')
vg, _pool, _lv = self._create_vdo_pool()
self.handle_return(vg.Vg.Remove(dbus.Int32(g_tmo), EOD))
def test_vdo_pool_compression_deduplication(self):
if not self.vdo:
raise unittest.SkipTest('vdo not supported')
vg, pool, _lv = self._create_vdo_pool_and_lv(vg_prefix="vdo2_")
# compression and deduplication should be enabled by default
self.assertEqual(pool.VdoPool.Compression, "enabled")
self.assertEqual(pool.VdoPool.Deduplication, "enabled")
self.handle_return(
pool.VdoPool.DisableCompression(dbus.Int32(g_tmo), EOD))
self.handle_return(
pool.VdoPool.DisableDeduplication(dbus.Int32(g_tmo), EOD))
pool.update()
self.assertEqual(pool.VdoPool.Compression, "")
self.assertEqual(pool.VdoPool.Deduplication, "")
self.handle_return(
pool.VdoPool.EnableCompression(dbus.Int32(g_tmo), EOD))
self.handle_return(
pool.VdoPool.EnableDeduplication(dbus.Int32(g_tmo), EOD))
pool.update()
self.assertEqual(pool.VdoPool.Compression, "enabled")
self.assertEqual(pool.VdoPool.Deduplication, "enabled")
self.handle_return(vg.Vg.Remove(dbus.Int32(g_tmo), EOD))
def _test_lv_method_interface(self, lv):
self._rename_lv_test(lv)
self._test_activate_deactivate(lv)
self._test_lv_tags(lv)
self._test_lv_resize(lv)
def _test_lv_method_interface_sequence(
self, lv, test_ss=True, remove_lv=True):
self._test_lv_method_interface(lv)
# We can't take a snapshot of a pool lv (not yet).
if test_ss:
ss_lv = self._take_lv_snapshot(lv)
self._test_lv_method_interface(ss_lv)
self._lv_remove(ss_lv)
if remove_lv:
self._lv_remove(lv)
def test_lv_interface_plain_lv(self):
self._test_lv_method_interface_sequence(self._create_lv())
def test_lv_interface_vdo_lv(self):
if not self.vdo:
raise unittest.SkipTest('vdo not supported')
self._test_lv_method_interface_sequence(self._create_vdo_lv())
def test_lv_interface_cache_lv(self):
self._test_lv_method_interface_sequence(
self._create_cache_lv(), remove_lv=False)
def test_lv_interface_thin_pool_lv(self):
self._test_lv_method_interface_sequence(
self._create_thin_pool_lv(), test_ss=False)
def test_lv_interface_vdo_pool_lv(self):
if not self.vdo:
raise unittest.SkipTest('vdo not supported')
self._test_lv_method_interface_sequence(
self._vdo_pool_lv(), test_ss=False)
class AggregateResults(object):
def __init__(self):
self.no_errors = True
def register_result(self, result):
if not result.result.wasSuccessful():
self.no_errors = False
def register_fail(self):
self.no_errors = False
def exit_run(self):
if self.no_errors:
sys.exit(0)
sys.exit(1)
if __name__ == '__main__':
r = AggregateResults()
mode = int(test_shell)
if mode == 0:
std_err_print('\n*** Testing only lvm fork & exec test mode ***\n')
elif mode == 1:
std_err_print('\n*** Testing only lvm shell mode ***\n')
elif mode == 2:
std_err_print('\n*** Testing fork & exec & lvm shell mode ***\n')
else:
std_err_print("Unsupported \"LVM_DBUSD_TEST_MODE\"=%d, [0-2] valid" % mode)
sys.exit(1)
for g_tmo in [0, 15]:
std_err_print('Testing TMO=%d\n' % g_tmo)
if mode == 0:
if set_execution(False, r):
r.register_result(unittest.main(exit=False))
elif mode == 1:
if set_execution(True, r):
r.register_result(unittest.main(exit=False))
else:
if set_execution(False, r):
r.register_result(unittest.main(exit=False))
# Test lvm shell
if set_execution(True, r):
r.register_result(unittest.main(exit=False))
if not r.no_errors:
break
r.exit_run()