1
0
mirror of git://sourceware.org/git/lvm2.git synced 2024-12-21 13:34:40 +03:00

lvmdbustest: Add test for wipefs

Ensure that if an external program or user calles wipefs on a PV that we
correctly update the state of the daemon.
This commit is contained in:
Tony Asleson 2022-10-18 12:26:14 -05:00
parent 0f56c1ad12
commit 5a6ae2d4d8

View File

@ -2385,6 +2385,40 @@ class TestDbusService(unittest.TestCase):
finally: finally:
set_exec_mode(g_lvm_shell) set_exec_mode(g_lvm_shell)
@staticmethod
def _wipe_it(block_device):
cmd = ["/usr/sbin/wipefs", '-a', block_device]
config = Popen(cmd, stdout=PIPE, stderr=PIPE, close_fds=True, env=os.environ)
config.communicate()
if config.returncode != 0:
return False
return True
def test_wipefs(self):
# Ensure we update the status of the daemon if an external process clears a PV
pv = self.objs[PV_INT][0]
pv_device_path = pv.Pv.Name
wipe_result = TestDbusService._wipe_it(pv_device_path)
self.assertTrue(wipe_result)
if wipe_result:
# Need to wait a bit before the daemon will reflect the change
start = time.time()
found = True
while found and time.time() < start + 10:
if (self._lookup(pv_device_path) == "/"):
found = False
print("Note: Time for udev update = %f" % (time.time() - start))
# Make sure that the service no longer has it
rc = self._lookup(pv_device_path)
self.assertEqual(rc, '/')
# Put it back
pv_object_path = self._pv_create(pv_device_path)
self.assertNotEqual(pv_object_path, '/')
class AggregateResults(object): class AggregateResults(object):