2012-10-26 15:35:17 -05:00
#!/usr/bin/env python
2013-07-26 09:51:58 -05:00
# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved.
2012-10-26 15:35:17 -05:00
#
# This file is part of LVM2.
#
# This copyrighted material is made available to anyone wishing to use,
# modify, copy, or redistribute it subject to the terms and conditions
# of the GNU General Public License v.2.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
import unittest
import random
import string
import lvm
2013-07-26 09:51:58 -05:00
import os
2013-11-18 15:09:09 -06:00
import itertools
2012-10-26 15:35:17 -05:00
# Set of basic unit tests for the python bindings.
2013-07-26 09:51:58 -05:00
#
# *** WARNING ***
#
# This test tries to only modify configuration for the list of allowed
# PVs, but an error in it could potentially cause data loss if run on a
# production system. Therefore it is strongly advised that this unit test
# not be run on a system that contains data of value.
2012-10-26 15:35:17 -05:00
2013-09-11 18:13:18 -05:00
fh = None
def l ( txt ) :
if os . environ . get ( ' PY_UNIT_LOG ' ) is not None :
global fh
if fh is None :
fh = open ( ' /tmp/lvm_py_unit_test_ ' + rs ( 10 ) , " a " )
fh . write ( txt + " \n " )
fh . flush ( )
2013-07-25 18:24:52 -04:00
2013-11-18 15:07:32 -06:00
def rs ( rand_len = 10 ) :
2012-10-26 15:35:17 -05:00
"""
Generate a random string
"""
2015-04-07 15:36:11 -05:00
return ' ' . join (
random . choice ( string . ascii_uppercase ) for x in range ( rand_len ) )
2012-10-26 15:35:17 -05:00
2013-07-26 09:51:58 -05:00
def _get_allowed_devices ( ) :
rc = os . environ . get ( ' PY_UNIT_PVS ' )
if rc is not None :
2014-06-05 23:05:52 +02:00
rc = rc . splitlines ( )
2013-07-26 09:51:58 -05:00
rc . sort ( )
return rc
2013-11-18 15:07:32 -06:00
def compare_pv ( right , left ) :
r_name = right . getName ( )
l_name = left . getName ( )
2013-07-26 09:51:58 -05:00
if r_name > l_name :
return 1
elif r_name == l_name :
return 0
else :
return - 1
class AllowedPVS ( object ) :
"""
We are only allowed to muck with certain PV , filter to only
the ones we can use .
"""
def __init__ ( self ) :
self . handle = None
2013-11-18 15:07:32 -06:00
self . pvs_all = None
2013-07-26 09:51:58 -05:00
def __enter__ ( self ) :
rc = [ ]
allowed_dev = _get_allowed_devices ( )
if allowed_dev :
self . handle = lvm . listPvs ( )
self . pvs_all = self . handle . open ( )
for p in self . pvs_all :
if p . getName ( ) in allowed_dev :
rc . append ( p )
#Sort them consistently
rc . sort ( compare_pv )
return rc
def __exit__ ( self , t_type , value , traceback ) :
if self . handle :
self . pvs_all = None
self . handle . close ( )
2012-10-26 15:35:17 -05:00
class TestLvm ( unittest . TestCase ) :
2013-07-01 17:01:48 -04:00
2014-11-24 00:44:43 +01:00
VG_P = os . environ . get ( ' PREFIX ' )
2013-07-26 09:51:58 -05:00
2013-11-18 15:07:32 -06:00
@staticmethod
def _get_pv_device_names ( ) :
2013-07-01 17:01:48 -04:00
rc = [ ]
2013-07-26 09:51:58 -05:00
with AllowedPVS ( ) as pvs :
2013-07-01 17:01:48 -04:00
for p in pvs :
2013-07-26 09:51:58 -05:00
rc . append ( p . getName ( ) )
2013-07-01 17:01:48 -04:00
return rc
2013-11-18 15:07:32 -06:00
@staticmethod
def _create_thick_lv ( device_list , name ) :
2013-07-26 09:51:58 -05:00
vg = lvm . vgCreate ( TestLvm . VG_P + " _ " + name )
2013-07-01 17:01:48 -04:00
for d in device_list :
vg . extend ( d )
2013-11-18 15:07:32 -06:00
vg . createLvLinear ( name , vg . getSize ( ) / 2 )
2013-07-26 09:51:58 -05:00
vg . close ( )
vg = None
2013-07-01 17:01:48 -04:00
2013-11-18 15:07:32 -06:00
@staticmethod
def _create_thin_pool ( device_list , pool_name ) :
2013-07-26 09:51:58 -05:00
vg = lvm . vgCreate ( TestLvm . VG_P + " _ " + pool_name )
for d in device_list :
vg . extend ( d )
2015-04-07 15:36:11 -05:00
vg . createLvThinpool (
pool_name , vg . getSize ( ) / 2 , 0 , 0 , lvm . THIN_DISCARDS_PASSDOWN , 1 )
2013-07-26 09:51:58 -05:00
return vg
2013-11-18 15:07:32 -06:00
@staticmethod
def _create_thin_lv ( pv_devices , name ) :
2013-07-26 09:51:58 -05:00
thin_pool_name = ' thin_vg_pool_ ' + rs ( 4 )
2013-11-18 15:07:32 -06:00
vg = TestLvm . _create_thin_pool ( pv_devices , thin_pool_name )
2015-04-07 15:36:11 -05:00
vg . createLvThin ( thin_pool_name , name , vg . getSize ( ) / 8 )
2013-07-01 17:01:48 -04:00
vg . close ( )
vg = None
2013-11-18 15:07:32 -06:00
@staticmethod
def _vg_names ( ) :
2013-07-26 09:51:58 -05:00
rc = [ ]
vg_names = lvm . listVgNames ( )
for i in vg_names :
if i [ 0 : len ( TestLvm . VG_P ) ] == TestLvm . VG_P :
rc . append ( i )
return rc
2013-11-18 15:07:32 -06:00
@staticmethod
def _get_lv ( lv_vol_type = None , lv_name = None ) :
vg_name_list = TestLvm . _vg_names ( )
2013-07-26 09:51:58 -05:00
for vg_name in vg_name_list :
vg = lvm . vgOpen ( vg_name , " w " )
lvs = vg . listLVs ( )
2013-11-18 15:07:32 -06:00
for lv in lvs :
attr = lv . getAttr ( )
2013-07-26 09:51:58 -05:00
if lv_vol_type or lv_name :
if lv_vol_type is not None and attr [ 0 ] == lv_vol_type :
2013-11-18 15:07:32 -06:00
return lv , vg
elif lv_name is not None and lv_name == lv . getName ( ) :
return lv , vg
2013-07-26 09:51:58 -05:00
else :
2013-11-18 15:07:32 -06:00
return lv , vg
2013-07-26 09:51:58 -05:00
vg . close ( )
return None , None
2013-11-18 15:07:32 -06:00
@staticmethod
def _remove_vg ( vg_name ) :
2013-07-01 17:01:48 -04:00
vg = lvm . vgOpen ( vg_name , ' w ' )
pvs = vg . listPVs ( )
pe_devices = [ ]
#Remove old snapshots first, then lv
2013-11-18 15:07:32 -06:00
for lv in vg . listLVs ( ) :
attr = lv . getAttr ( )
2013-07-01 17:01:48 -04:00
if attr [ 0 ] == ' s ' :
2013-11-18 15:07:32 -06:00
lv . remove ( )
2013-07-01 17:01:48 -04:00
2013-07-26 09:51:58 -05:00
lvs = vg . listLVs ( )
#Now remove any thin lVs
2013-11-18 15:07:32 -06:00
for lv in vg . listLVs ( ) :
attr = lv . getAttr ( )
2013-07-26 09:51:58 -05:00
if attr [ 0 ] == ' V ' :
2013-11-18 15:07:32 -06:00
lv . remove ( )
2013-07-26 09:51:58 -05:00
#now remove the rest
2013-11-18 15:07:32 -06:00
for lv in vg . listLVs ( ) :
name = lv . getName ( )
2013-07-26 09:51:58 -05:00
#Don't remove the hidden ones
2013-11-18 15:07:32 -06:00
if ' _tmeta ' not in name and ' _tdata ' not in name :
lv . remove ( )
2013-07-01 17:01:48 -04:00
for p in pvs :
pe_devices . append ( p . getName ( ) )
2013-09-10 17:59:45 -05:00
for pv in pe_devices [ : - 1 ] :
2013-07-01 17:01:48 -04:00
vg . reduce ( pv )
vg . remove ( )
vg . close ( )
2013-11-18 15:07:32 -06:00
@staticmethod
def _clean_up ( ) :
2013-07-26 09:51:58 -05:00
#Clear out the testing PVs, but only if they contain stuff
#this unit test created
2013-11-18 15:07:32 -06:00
for vg_n in TestLvm . _vg_names ( ) :
TestLvm . _remove_vg ( vg_n )
for d in TestLvm . _get_pv_device_names ( ) :
lvm . pvRemove ( d )
lvm . pvCreate ( d )
2013-07-26 09:51:58 -05:00
2012-10-26 15:35:17 -05:00
def setUp ( self ) :
2013-11-18 15:07:32 -06:00
device_list = TestLvm . _get_pv_device_names ( )
2013-07-01 17:01:48 -04:00
2013-07-26 09:51:58 -05:00
#Make sure we have an adequate number of PVs to use
2013-07-01 17:01:48 -04:00
self . assertTrue ( len ( device_list ) > = 4 )
2013-11-18 15:07:32 -06:00
TestLvm . _clean_up ( )
2013-07-01 17:01:48 -04:00
2013-07-26 09:51:58 -05:00
def tearDown ( self ) :
2013-11-18 15:07:32 -06:00
TestLvm . _clean_up ( )
2013-07-01 17:01:48 -04:00
2013-11-18 15:07:32 -06:00
def test_pv_resize ( self ) :
2013-07-26 09:51:58 -05:00
with AllowedPVS ( ) as pvs :
pv = pvs [ 0 ]
2013-07-01 17:01:48 -04:00
curr_size = pv . getSize ( )
dev_size = pv . getDevSize ( )
self . assertTrue ( curr_size == dev_size )
2015-04-07 15:36:11 -05:00
pv . resize ( curr_size / 2 )
2013-07-26 09:51:58 -05:00
with AllowedPVS ( ) as pvs :
pv = pvs [ 0 ]
2013-07-01 17:01:48 -04:00
resized_size = pv . getSize ( )
self . assertTrue ( resized_size != curr_size )
pv . resize ( dev_size )
2013-11-18 15:07:32 -06:00
def test_pv_life_cycle ( self ) :
2013-07-01 17:01:48 -04:00
"""
Test removing and re - creating a PV
"""
2013-07-26 09:51:58 -05:00
target_name = None
2013-07-01 17:01:48 -04:00
2013-07-26 09:51:58 -05:00
with AllowedPVS ( ) as pvs :
pv = pvs [ 0 ]
target_name = pv . getName ( )
lvm . pvRemove ( target_name )
2013-07-01 17:01:48 -04:00
2013-07-26 09:51:58 -05:00
with AllowedPVS ( ) as pvs :
2013-07-01 17:01:48 -04:00
for p in pvs :
2013-07-26 09:51:58 -05:00
self . assertTrue ( p . getName ( ) != target_name )
2013-07-01 17:01:48 -04:00
2013-07-26 09:51:58 -05:00
lvm . pvCreate ( target_name , 0 )
2013-07-01 17:01:48 -04:00
2013-07-26 09:51:58 -05:00
with AllowedPVS ( ) as pvs :
2013-07-01 17:01:48 -04:00
found = False
for p in pvs :
2013-07-26 09:51:58 -05:00
if p . getName ( ) == target_name :
2013-07-01 17:01:48 -04:00
found = True
self . assertTrue ( found )
2013-11-18 15:07:32 -06:00
@staticmethod
def test_pv_methods ( ) :
2013-07-26 09:51:58 -05:00
with AllowedPVS ( ) as pvs :
2013-07-01 17:01:48 -04:00
for p in pvs :
p . getName ( )
p . getUuid ( )
p . getMdaCount ( )
p . getSize ( )
p . getDevSize ( )
p . getFree ( )
p = None
2012-10-26 15:35:17 -05:00
2013-11-18 15:07:32 -06:00
def test_version ( self ) :
2012-10-26 15:35:17 -05:00
version = lvm . getVersion ( )
self . assertNotEquals ( version , None )
self . assertEquals ( type ( version ) , str )
self . assertTrue ( len ( version ) > 0 )
2013-11-18 15:07:32 -06:00
def test_pv_getters ( self ) :
2013-07-26 09:51:58 -05:00
with AllowedPVS ( ) as pvs :
pv = pvs [ 0 ]
self . assertEqual ( type ( pv . getName ( ) ) , str )
self . assertTrue ( len ( pv . getName ( ) ) > 0 )
2012-10-26 15:35:17 -05:00
2013-07-26 09:51:58 -05:00
self . assertEqual ( type ( pv . getUuid ( ) ) , str )
self . assertTrue ( len ( pv . getUuid ( ) ) > 0 )
2012-10-26 15:35:17 -05:00
2015-04-07 15:36:11 -05:00
self . assertTrue (
type ( pv . getMdaCount ( ) ) == int or
type ( pv . getMdaCount ( ) ) == long )
2012-10-26 15:35:17 -05:00
2015-04-07 15:36:11 -05:00
self . assertTrue (
type ( pv . getSize ( ) ) == int or
type ( pv . getSize ( ) ) == long )
2012-10-26 15:35:17 -05:00
2015-04-07 15:36:11 -05:00
self . assertTrue (
type ( pv . getDevSize ( ) ) == int or
type ( pv . getSize ( ) ) == long )
2012-10-26 15:35:17 -05:00
2015-04-07 15:36:11 -05:00
self . assertTrue (
type ( pv . getFree ( ) ) == int or
type ( pv . getFree ( ) ) == long )
2013-07-01 17:01:48 -04:00
2012-10-26 15:35:17 -05:00
def _test_prop ( self , prop_obj , prop , var_type , settable ) :
result = prop_obj . getProperty ( prop )
self . assertEqual ( type ( result [ 0 ] ) , var_type )
self . assertEqual ( type ( result [ 1 ] ) , bool )
self . assertTrue ( result [ 1 ] == settable )
2013-11-18 15:07:32 -06:00
def test_pv_segs ( self ) :
2013-07-26 09:51:58 -05:00
with AllowedPVS ( ) as pvs :
pv = pvs [ 0 ]
pv_segs = pv . listPVsegs ( )
2012-10-26 15:35:17 -05:00
2013-07-26 09:51:58 -05:00
#LVsegs returns a tuple, (value, bool settable)
#TODO: Test other properties of pv_seg
for i in pv_segs :
self . _test_prop ( i , ' pvseg_start ' , long , False )
2013-07-01 17:01:48 -04:00
2013-11-18 15:07:32 -06:00
def test_pv_property ( self ) :
2013-07-26 09:51:58 -05:00
with AllowedPVS ( ) as pvs :
pv = pvs [ 0 ]
self . _test_prop ( pv , ' pv_mda_count ' , long , False )
2012-10-26 15:35:17 -05:00
2013-11-18 15:07:32 -06:00
def test_lv_property ( self ) :
2013-07-26 09:51:58 -05:00
lv_name = ' lv_test '
2013-11-18 15:07:32 -06:00
TestLvm . _create_thin_lv ( TestLvm . _get_pv_device_names ( ) , lv_name )
lv , vg = TestLvm . _get_lv ( None , lv_name )
2012-10-26 15:35:17 -05:00
self . _test_prop ( lv , ' seg_count ' , long , False )
2013-07-01 17:01:48 -04:00
vg . close ( )
2012-10-26 15:35:17 -05:00
2013-11-18 15:07:32 -06:00
def test_lv_tags ( self ) :
2013-07-26 09:51:58 -05:00
lv_name = ' lv_test '
2013-11-18 15:07:32 -06:00
TestLvm . _create_thin_lv ( TestLvm . _get_pv_device_names ( ) , lv_name )
lv , vg = TestLvm . _get_lv ( None , lv_name )
self . _test_tags ( lv )
2013-07-01 17:01:48 -04:00
vg . close ( )
2012-10-26 15:35:17 -05:00
2013-11-18 15:07:32 -06:00
def test_lv_active_inactive ( self ) :
2013-07-26 09:51:58 -05:00
lv_name = ' lv_test '
2013-11-18 15:07:32 -06:00
TestLvm . _create_thin_lv ( TestLvm . _get_pv_device_names ( ) , lv_name )
lv , vg = TestLvm . _get_lv ( None , lv_name )
2012-10-26 15:35:17 -05:00
lv . deactivate ( )
2013-07-25 18:24:52 -04:00
self . assertTrue ( lv . isActive ( ) is False )
2012-10-26 15:35:17 -05:00
lv . activate ( )
2013-07-25 18:24:52 -04:00
self . assertTrue ( lv . isActive ( ) is True )
2013-07-01 17:01:48 -04:00
vg . close ( )
2012-10-26 15:35:17 -05:00
2013-11-18 15:07:32 -06:00
def test_lv_rename ( self ) :
2013-07-26 09:51:58 -05:00
lv_name = ' lv_test '
2013-11-18 15:07:32 -06:00
TestLvm . _create_thin_lv ( TestLvm . _get_pv_device_names ( ) , lv_name )
lv , vg = TestLvm . _get_lv ( None , lv_name )
2012-10-26 15:35:17 -05:00
current_name = lv . getName ( )
new_name = rs ( )
lv . rename ( new_name )
self . assertEqual ( lv . getName ( ) , new_name )
lv . rename ( current_name )
2013-07-01 17:01:48 -04:00
vg . close ( )
2015-04-07 15:25:20 -05:00
def test_lv_persistence ( self ) :
# Make changes to the lv, close the vg and re-open to make sure that
# the changes persist
lv_name = ' lv_test_persist '
TestLvm . _create_thick_lv ( TestLvm . _get_pv_device_names ( ) , lv_name )
# Test rename
lv , vg = TestLvm . _get_lv ( None , lv_name )
current_name = lv . getName ( )
new_name = rs ( )
lv . rename ( new_name )
vg . close ( )
vg = None
lv , vg = TestLvm . _get_lv ( None , new_name )
self . assertTrue ( lv is not None )
if lv and vg :
lv . rename ( lv_name )
vg . close ( )
vg = None
# Test lv tag add
tag = ' hello_world '
lv , vg = TestLvm . _get_lv ( None , lv_name )
lv . addTag ( tag )
vg . close ( )
vg = None
lv , vg = TestLvm . _get_lv ( None , lv_name )
tags = lv . getTags ( )
self . assertTrue ( tag in tags )
vg . close ( )
vg = None
# Test lv tag delete
lv , vg = TestLvm . _get_lv ( None , lv_name )
self . assertTrue ( lv is not None and vg is not None )
if lv and vg :
tags = lv . getTags ( )
for t in tags :
lv . removeTag ( t )
vg . close ( )
vg = None
lv , vg = TestLvm . _get_lv ( None , lv_name )
self . assertTrue ( lv is not None and vg is not None )
if lv and vg :
tags = lv . getTags ( )
if tags :
self . assertEqual ( len ( tags ) , 0 )
vg . close ( )
vg = None
# Test lv deactivate
lv , vg = TestLvm . _get_lv ( None , lv_name )
self . assertTrue ( lv is not None and vg is not None )
if lv and vg :
lv . deactivate ( )
vg . close ( )
vg = None
lv , vg = TestLvm . _get_lv ( None , lv_name )
self . assertTrue ( lv is not None and vg is not None )
if lv and vg :
self . assertFalse ( lv . isActive ( ) )
vg . close ( )
vg = None
# Test lv activate
lv , vg = TestLvm . _get_lv ( None , lv_name )
self . assertTrue ( lv is not None and vg is not None )
if lv and vg :
lv . activate ( )
vg . close ( )
vg = None
lv , vg = TestLvm . _get_lv ( None , lv_name )
self . assertTrue ( lv is not None and vg is not None )
if lv and vg :
self . assertTrue ( lv . isActive ( ) )
vg . close ( )
vg = None
2013-11-18 15:07:32 -06:00
def test_lv_snapshot ( self ) :
2013-07-01 17:01:48 -04:00
2013-07-26 09:51:58 -05:00
thin_lv = ' thin_lv '
thick_lv = ' thick_lv '
2013-07-01 17:01:48 -04:00
2013-11-18 15:07:32 -06:00
device_names = TestLvm . _get_pv_device_names ( )
2013-07-01 17:01:48 -04:00
2013-11-18 15:07:32 -06:00
TestLvm . _create_thin_lv ( device_names [ 0 : 2 ] , thin_lv )
TestLvm . _create_thick_lv ( device_names [ 2 : 4 ] , thick_lv )
2013-07-01 17:01:48 -04:00
2013-11-18 15:07:32 -06:00
lv , vg = TestLvm . _get_lv ( None , thick_lv )
2014-06-05 23:07:23 +02:00
# FIXME lv.snapshot('thick_snap_shot', 1024*1024)
2013-07-01 17:01:48 -04:00
vg . close ( )
2014-06-05 23:07:23 +02:00
# FIXME thick_ss, vg = TestLvm._get_lv(None, 'thick_snap_shot')
# FIXME self.assertTrue(thick_ss is not None)
# FIXME vg.close()
2013-07-01 17:01:48 -04:00
2013-11-18 15:07:32 -06:00
thin_lv , vg = TestLvm . _get_lv ( None , thin_lv )
2013-07-26 09:51:58 -05:00
thin_lv . snapshot ( ' thin_snap_shot ' )
2013-07-01 17:01:48 -04:00
vg . close ( )
2013-11-18 15:07:32 -06:00
thin_ss , vg = TestLvm . _get_lv ( None , ' thin_snap_shot ' )
2013-07-01 17:01:48 -04:00
self . assertTrue ( thin_ss is not None )
origin = thin_ss . getOrigin ( )
2013-07-26 09:51:58 -05:00
self . assertTrue ( thin_lv , origin )
2013-07-01 17:01:48 -04:00
vg . close ( )
2013-11-18 15:07:32 -06:00
def test_lv_suspend ( self ) :
2013-07-26 09:51:58 -05:00
lv_name = ' lv_test '
2013-11-18 15:07:32 -06:00
TestLvm . _create_thin_lv ( TestLvm . _get_pv_device_names ( ) , lv_name )
lv , vg = TestLvm . _get_lv ( None , lv_name )
2012-10-26 15:35:17 -05:00
result = lv . isSuspended ( )
2013-07-01 17:01:48 -04:00
self . assertTrue ( type ( result ) == bool )
vg . close ( )
2012-10-26 15:35:17 -05:00
2013-11-18 15:07:32 -06:00
def test_lv_size ( self ) :
2013-07-26 09:51:58 -05:00
lv_name = ' lv_test '
2013-11-18 15:07:32 -06:00
TestLvm . _create_thin_lv ( TestLvm . _get_pv_device_names ( ) , lv_name )
lv , vg = TestLvm . _get_lv ( None , lv_name )
2013-07-26 09:51:58 -05:00
2012-10-26 15:35:17 -05:00
result = lv . getSize ( )
2013-07-25 18:24:52 -04:00
self . assertTrue ( type ( result ) == int or type ( result ) == long )
2013-07-01 17:01:48 -04:00
vg . close ( )
2012-10-26 15:35:17 -05:00
2013-11-18 15:07:32 -06:00
def test_lv_resize ( self ) :
2013-07-26 09:51:58 -05:00
lv_name = ' lv_test '
2013-11-18 15:07:32 -06:00
TestLvm . _create_thin_lv ( TestLvm . _get_pv_device_names ( ) , lv_name )
lv , vg = TestLvm . _get_lv ( None , lv_name )
2013-07-26 09:51:58 -05:00
2013-07-01 17:01:48 -04:00
curr_size = lv . getSize ( )
2015-04-07 15:36:11 -05:00
lv . resize ( curr_size + ( 1024 * 1024 ) )
2013-07-01 17:01:48 -04:00
latest = lv . getSize ( )
self . assertTrue ( curr_size != latest )
2012-10-26 15:35:17 -05:00
2013-11-18 15:07:32 -06:00
def test_lv_seg ( self ) :
2013-07-26 09:51:58 -05:00
lv_name = ' lv_test '
2013-11-18 15:07:32 -06:00
TestLvm . _create_thin_lv ( TestLvm . _get_pv_device_names ( ) , lv_name )
lv , vg = TestLvm . _get_lv ( None , lv_name )
2012-10-26 15:35:17 -05:00
lv_segs = lv . listLVsegs ( )
#LVsegs returns a tuple, (value, bool settable)
#TODO: Test other properties of lv_seg
for i in lv_segs :
self . _test_prop ( i , ' seg_start_pe ' , long , False )
2013-07-01 17:01:48 -04:00
vg . close ( )
2012-10-26 15:35:17 -05:00
2013-11-18 15:07:32 -06:00
def test_get_set_extend_size ( self ) :
2013-07-26 09:51:58 -05:00
thick_lv = ' get_set_prop '
2013-11-18 15:07:32 -06:00
device_names = TestLvm . _get_pv_device_names ( )
TestLvm . _create_thick_lv ( device_names [ 0 : 2 ] , thick_lv )
lv , vg = TestLvm . _get_lv ( None , thick_lv )
2013-07-26 09:51:58 -05:00
new_extent = 1024 * 1024 * 4
2015-04-07 15:36:11 -05:00
self . assertFalse (
vg . getExtentSize ( ) != new_extent ,
" Cannot determine if it works if they are the same " )
2013-07-26 09:51:58 -05:00
vg . setExtentSize ( new_extent )
self . assertEqual ( vg . getExtentSize ( ) , new_extent )
vg . close ( )
2013-11-18 15:07:32 -06:00
def test_vg_get_set_prop ( self ) :
2013-07-26 09:51:58 -05:00
thick_lv = ' get_set_prop '
2013-11-18 15:07:32 -06:00
device_names = TestLvm . _get_pv_device_names ( )
TestLvm . _create_thick_lv ( device_names [ 0 : 2 ] , thick_lv )
lv , vg = TestLvm . _get_lv ( None , thick_lv )
2012-10-26 15:35:17 -05:00
2013-07-01 17:01:48 -04:00
self . assertTrue ( vg is not None )
if vg :
2012-10-26 15:35:17 -05:00
vg_mda_copies = vg . getProperty ( ' vg_mda_copies ' )
vg . setProperty ( ' vg_mda_copies ' , vg_mda_copies [ 0 ] )
vg . close ( )
2013-11-18 15:07:32 -06:00
def test_vg_remove_restore ( self ) :
2013-07-01 17:01:48 -04:00
#Store off the list of physical devices
2013-07-26 09:51:58 -05:00
pv_devices = [ ]
thick_lv = ' get_set_prop '
2013-11-18 15:07:32 -06:00
device_names = TestLvm . _get_pv_device_names ( )
TestLvm . _create_thick_lv ( device_names [ 0 : 2 ] , thick_lv )
lv , vg = TestLvm . _get_lv ( None , thick_lv )
2013-07-26 09:51:58 -05:00
vg_name = vg . getName ( )
2012-10-26 15:35:17 -05:00
2013-07-01 17:01:48 -04:00
pvs = vg . listPVs ( )
for p in pvs :
2013-07-26 09:51:58 -05:00
pv_devices . append ( p . getName ( ) )
2013-07-01 17:01:48 -04:00
vg . close ( )
2012-10-26 15:35:17 -05:00
2013-11-18 15:07:32 -06:00
TestLvm . _remove_vg ( vg_name )
self . _create_thick_lv ( pv_devices , thick_lv )
2012-10-26 15:35:17 -05:00
2013-11-18 15:07:32 -06:00
def test_vg_names ( self ) :
2012-10-26 15:35:17 -05:00
vg = lvm . listVgNames ( )
self . assertTrue ( isinstance ( vg , tuple ) )
2013-11-18 15:07:32 -06:00
def test_dupe_lv_create ( self ) :
2012-10-26 15:35:17 -05:00
"""
Try to create a lv with the same name expecting a failure
Note : This was causing a seg . fault previously
"""
2013-07-26 09:51:58 -05:00
thick_lv = ' dupe_name '
2013-11-18 15:07:32 -06:00
device_names = TestLvm . _get_pv_device_names ( )
TestLvm . _create_thick_lv ( device_names [ 0 : 2 ] , thick_lv )
lv , vg = TestLvm . _get_lv ( None , thick_lv )
2012-10-26 15:35:17 -05:00
2013-07-26 09:51:58 -05:00
self . assertTrue ( vg is not None )
2012-10-26 15:35:17 -05:00
2013-07-26 09:51:58 -05:00
if vg :
2012-10-26 15:35:17 -05:00
lvs = vg . listLVs ( )
if len ( lvs ) :
lv = lvs [ 0 ]
lv_name = lv . getName ( )
2015-04-07 15:36:11 -05:00
self . assertRaises (
lvm . LibLVMError , vg . createLvLinear , lv_name , lv . getSize ( ) )
2013-07-26 09:51:58 -05:00
vg . close ( )
2012-10-26 15:35:17 -05:00
2013-11-18 15:07:32 -06:00
def test_vg_uuids ( self ) :
2013-07-26 09:51:58 -05:00
2013-11-18 15:07:32 -06:00
device_names = TestLvm . _get_pv_device_names ( )
TestLvm . _create_thin_lv ( device_names [ 0 : 2 ] , ' thin ' )
TestLvm . _create_thick_lv ( device_names [ 2 : 4 ] , ' thick ' )
2013-07-26 09:51:58 -05:00
2012-10-26 15:35:17 -05:00
vgs_uuids = lvm . listVgUuids ( )
2013-07-26 09:51:58 -05:00
self . assertTrue ( len ( vgs_uuids ) > 0 )
2012-10-26 15:35:17 -05:00
self . assertTrue ( isinstance ( vgs_uuids , tuple ) )
vgs_uuids = list ( vgs_uuids )
vgs_names = lvm . listVgNames ( )
for vg_name in vgs_names :
vg = lvm . vgOpen ( vg_name , " r " )
2013-07-01 17:01:48 -04:00
#TODO Write/fix BUG, vg uuid don't match between
#lvm.listVgUuids and vg.getUuid()
2012-10-26 15:35:17 -05:00
vg_uuid_search = vg . getUuid ( ) . replace ( ' - ' , ' ' )
self . assertTrue ( vg_uuid_search in vgs_uuids )
vgs_uuids . remove ( vg_uuid_search )
2013-07-01 17:01:48 -04:00
vg . close ( )
2012-10-26 15:35:17 -05:00
self . assertTrue ( len ( vgs_uuids ) == 0 )
2013-11-18 15:07:32 -06:00
def test_pv_lookup_from_vg ( self ) :
device_names = TestLvm . _get_pv_device_names ( )
TestLvm . _create_thin_lv ( device_names [ 0 : 2 ] , ' thin ' )
TestLvm . _create_thick_lv ( device_names [ 2 : 4 ] , ' thick ' )
2013-07-26 09:51:58 -05:00
2013-11-18 15:07:32 -06:00
vg_names = TestLvm . _vg_names ( )
2013-07-26 09:51:58 -05:00
self . assertTrue ( len ( vg_names ) > 0 )
2013-07-01 17:01:48 -04:00
for vg_name in vg_names :
vg = lvm . vgOpen ( vg_name , ' w ' )
pvs = vg . listPVs ( )
for p in pvs :
name = p . getName ( )
uuid = p . getUuid ( )
pv_name_lookup = vg . pvFromName ( name )
pv_uuid_lookup = vg . pvFromUuid ( uuid )
2015-04-07 15:36:11 -05:00
self . assertTrue (
pv_name_lookup . getName ( ) == pv_uuid_lookup . getName ( ) )
self . assertTrue (
pv_name_lookup . getUuid ( ) == pv_uuid_lookup . getUuid ( ) )
2013-07-01 17:01:48 -04:00
self . assertTrue ( name == pv_name_lookup . getName ( ) )
self . assertTrue ( uuid == pv_uuid_lookup . getUuid ( ) )
pv_name_lookup = None
pv_uuid_lookup = None
p = None
pvs = None
vg . close ( )
2013-11-18 15:07:32 -06:00
def test_percent_to_float ( self ) :
2012-10-26 15:35:17 -05:00
self . assertEqual ( lvm . percentToFloat ( 0 ) , 0.0 )
self . assertEqual ( lvm . percentToFloat ( 1000000 ) , 1.0 )
self . assertEqual ( lvm . percentToFloat ( 1000000 / 2 ) , 0.5 )
2013-11-18 15:07:32 -06:00
def test_scan ( self ) :
2012-10-26 15:35:17 -05:00
self . assertEqual ( lvm . scan ( ) , None )
2013-11-18 15:07:32 -06:00
def test_config_reload ( self ) :
2012-10-26 15:35:17 -05:00
self . assertEqual ( lvm . configReload ( ) , None )
2013-11-18 15:07:32 -06:00
def test_config_override ( self ) :
2012-10-26 15:35:17 -05:00
self . assertEquals ( lvm . configOverride ( " global.test = 1 " ) , None )
2013-11-18 15:07:32 -06:00
def test_config_find_bool ( self ) :
2013-07-01 17:01:48 -04:00
either_or = lvm . configFindBool ( " global/fallback_to_local_locking " )
self . assertTrue ( type ( either_or ) == bool )
2012-10-26 15:35:17 -05:00
self . assertTrue ( lvm . configFindBool ( " global/locking_type " ) )
2013-11-18 15:07:32 -06:00
def test_vg_from_pv_lookups ( self ) :
device_names = TestLvm . _get_pv_device_names ( )
TestLvm . _create_thin_lv ( device_names [ 0 : 2 ] , ' thin ' )
TestLvm . _create_thick_lv ( device_names [ 2 : 4 ] , ' thick ' )
2013-07-26 09:51:58 -05:00
2013-11-18 15:07:32 -06:00
vgname_list = TestLvm . _vg_names ( )
2013-07-26 09:51:58 -05:00
self . assertTrue ( len ( vgname_list ) > 0 )
2012-10-26 15:35:17 -05:00
for vg_name in vgname_list :
vg = lvm . vgOpen ( vg_name , ' r ' )
vg_name = vg . getName ( )
pv_list = vg . listPVs ( )
for pv in pv_list :
vg_name_from_pv = lvm . vgNameFromPvid ( pv . getUuid ( ) )
self . assertEquals ( vg_name , vg_name_from_pv )
self . assertEqual ( vg_name , lvm . vgNameFromDevice ( pv . getName ( ) ) )
vg . close ( )
2013-11-18 15:07:32 -06:00
def test_vg_get_name ( self ) :
device_names = TestLvm . _get_pv_device_names ( )
TestLvm . _create_thin_lv ( device_names [ 0 : 2 ] , ' thin ' )
TestLvm . _create_thick_lv ( device_names [ 2 : 4 ] , ' thick ' )
2013-07-26 09:51:58 -05:00
2013-11-18 15:07:32 -06:00
vgname_list = TestLvm . _vg_names ( )
2013-07-26 09:51:58 -05:00
self . assertTrue ( len ( vgname_list ) > 0 )
2012-10-26 15:35:17 -05:00
for vg_name in vgname_list :
vg = lvm . vgOpen ( vg_name , ' r ' )
self . assertEqual ( vg . getName ( ) , vg_name )
vg . close ( )
2013-11-18 15:07:32 -06:00
def test_vg_get_uuid ( self ) :
device_names = TestLvm . _get_pv_device_names ( )
TestLvm . _create_thin_lv ( device_names [ 0 : 2 ] , ' thin ' )
TestLvm . _create_thick_lv ( device_names [ 2 : 4 ] , ' thick ' )
2013-07-26 09:51:58 -05:00
2013-11-18 15:07:32 -06:00
vgname_list = TestLvm . _vg_names ( )
2013-07-26 09:51:58 -05:00
self . assertTrue ( len ( vgname_list ) > 0 )
2012-10-26 15:35:17 -05:00
for vg_name in vgname_list :
vg = lvm . vgOpen ( vg_name , ' r ' )
uuid = vg . getUuid ( )
self . assertNotEqual ( uuid , None )
self . assertTrue ( len ( uuid ) > 0 )
vg . close ( )
2015-04-07 15:36:11 -05:00
RETURN_NUMERIC = [
" getSeqno " , " getSize " , " getFreeSize " , " getFreeSize " ,
" getExtentSize " , " getExtentCount " , " getFreeExtentCount " ,
" getPvCount " , " getMaxPv " , " getMaxLv " ]
2012-10-26 15:35:17 -05:00
2013-11-18 15:07:32 -06:00
def test_vg_getters ( self ) :
device_names = TestLvm . _get_pv_device_names ( )
TestLvm . _create_thin_lv ( device_names [ 0 : 2 ] , ' thin ' )
TestLvm . _create_thick_lv ( device_names [ 2 : 4 ] , ' thick ' )
2013-07-26 09:51:58 -05:00
2013-11-18 15:07:32 -06:00
vg_name_list = TestLvm . _vg_names ( )
2013-07-26 09:51:58 -05:00
self . assertTrue ( len ( vg_name_list ) > 0 )
2012-10-26 15:35:17 -05:00
for vg_name in vg_name_list :
vg = lvm . vgOpen ( vg_name , ' r ' )
self . assertTrue ( type ( vg . isClustered ( ) ) == bool )
self . assertTrue ( type ( vg . isExported ( ) ) == bool )
self . assertTrue ( type ( vg . isPartial ( ) ) == bool )
#Loop through the list invoking the method
for method_name in TestLvm . RETURN_NUMERIC :
method = getattr ( vg , method_name )
result = method ( )
2013-07-25 18:24:52 -04:00
self . assertTrue ( type ( result ) == int or type ( result ) == long )
2012-10-26 15:35:17 -05:00
vg . close ( )
2013-11-18 15:07:32 -06:00
def _test_tags ( self , tag_obj ) :
2012-10-26 15:35:17 -05:00
existing_tags = tag_obj . getTags ( )
self . assertTrue ( type ( existing_tags ) == tuple )
num_tags = random . randint ( 2 , 40 )
created_tags = [ ]
for i in range ( num_tags ) :
tag_name = rs ( random . randint ( 1 , 128 ) )
tag_obj . addTag ( tag_name )
created_tags . append ( tag_name )
tags = tag_obj . getTags ( )
self . assertTrue ( len ( existing_tags ) + len ( created_tags ) == len ( tags ) )
num_remove = len ( created_tags )
for i in range ( num_remove ) :
2013-07-25 18:24:52 -04:00
tag_to_remove = created_tags [
random . randint ( 0 , len ( created_tags ) - 1 ) ]
2012-10-26 15:35:17 -05:00
created_tags . remove ( tag_to_remove )
tag_obj . removeTag ( tag_to_remove )
current_tags = tag_obj . getTags ( )
self . assertFalse ( tag_to_remove in current_tags )
current_tags = tag_obj . getTags ( )
self . assertTrue ( len ( current_tags ) == len ( existing_tags ) )
for e in existing_tags :
self . assertTrue ( e in current_tags )
2013-11-18 15:07:32 -06:00
def test_vg_tags ( self ) :
device_names = TestLvm . _get_pv_device_names ( )
2012-10-26 15:35:17 -05:00
2013-07-26 09:51:58 -05:00
i = 0
for d in device_names :
if i % 2 == 0 :
2015-04-07 15:36:11 -05:00
TestLvm . _create_thin_lv ( [ d ] , " thin_lv %d " % i )
2013-07-26 09:51:58 -05:00
else :
2013-11-18 15:07:32 -06:00
TestLvm . _create_thick_lv ( [ d ] , " thick_lv %d " % i )
2013-07-26 09:51:58 -05:00
i + = 1
2013-11-18 15:07:32 -06:00
for vg_name in TestLvm . _vg_names ( ) :
2012-10-26 15:35:17 -05:00
vg = lvm . vgOpen ( vg_name , ' w ' )
2013-11-18 15:07:32 -06:00
self . _test_tags ( vg )
2012-10-26 15:35:17 -05:00
vg . close ( )
2013-11-18 15:07:32 -06:00
@staticmethod
def test_listing ( ) :
2013-09-11 18:13:18 -05:00
env = os . environ
for k , v in env . items ( ) :
l ( " %s : %s " % ( k , v ) )
with lvm . listPvs ( ) as pvs :
for p in pvs :
l ( ' pv= %s ' % p . getName ( ) )
l ( ' Checking for VG ' )
for v in lvm . listVgNames ( ) :
l ( ' vg= %s ' % v )
2013-11-18 15:07:32 -06:00
def test_pv_empty_listing ( self ) :
2013-09-11 18:13:18 -05:00
#We had a bug where we would seg. fault if we had no PVs.
l ( ' testPVemptylisting entry ' )
2013-11-18 15:07:32 -06:00
device_names = TestLvm . _get_pv_device_names ( )
2013-09-11 18:13:18 -05:00
for d in device_names :
l ( " Removing %s " % d )
lvm . pvRemove ( d )
count = 0
with lvm . listPvs ( ) as pvs :
for p in pvs :
count + = 1
l ( ' pv= %s ' % p . getName ( ) )
self . assertTrue ( count == 0 )
for d in device_names :
lvm . pvCreate ( d )
2013-11-18 15:07:32 -06:00
def test_pv_create ( self ) :
2015-04-07 15:36:11 -05:00
size = [ 0 , 1024 * 1024 * 4 ]
2013-09-11 18:13:18 -05:00
pvmeta_copies = [ 0 , 1 , 2 ]
pvmeta_size = [ 0 , 255 , 512 , 1024 ]
data_alignment = [ 0 , 2048 , 4096 ]
data_alignment_offset = [ 1 , 1 , 1 ]
zero = [ 0 , 1 ]
2013-11-18 15:07:32 -06:00
device_names = TestLvm . _get_pv_device_names ( )
2013-09-11 18:13:18 -05:00
for d in device_names :
lvm . pvRemove ( d )
d = device_names [ 0 ]
#Test some error cases
self . assertRaises ( TypeError , lvm . pvCreate , None )
self . assertRaises ( lvm . LibLVMError , lvm . pvCreate , ' ' )
self . assertRaises ( lvm . LibLVMError , lvm . pvCreate , d , 4 )
self . assertRaises ( lvm . LibLVMError , lvm . pvCreate , d , 0 , 4 )
2015-04-07 15:36:11 -05:00
self . assertRaises ( lvm . LibLVMError , lvm . pvCreate , d , 0 , 0 , 0 , 2 * * 34 )
self . assertRaises (
lvm . LibLVMError , lvm . pvCreate , d , 0 , 0 , 0 , 4096 , 2 * * 34 )
2013-09-11 18:13:18 -05:00
#Try a number of combinations and permutations
for s in size :
lvm . pvCreate ( d , s )
lvm . pvRemove ( d )
for copies in pvmeta_copies :
lvm . pvCreate ( d , s , copies )
lvm . pvRemove ( d )
for pv_size in pvmeta_size :
lvm . pvCreate ( d , s , copies , pv_size )
lvm . pvRemove ( d )
for align in data_alignment :
lvm . pvCreate ( d , s , copies , pv_size , align )
lvm . pvRemove ( d )
for align_offset in data_alignment_offset :
2015-04-07 15:36:11 -05:00
lvm . pvCreate (
d , s , copies , pv_size , align ,
align * align_offset )
2013-09-11 18:13:18 -05:00
lvm . pvRemove ( d )
for z in zero :
2015-04-07 15:36:11 -05:00
lvm . pvCreate (
d , s , copies , pv_size , align ,
align * align_offset , z )
2013-09-11 18:13:18 -05:00
lvm . pvRemove ( d )
#Restore
for d in device_names :
lvm . pvCreate ( d )
2013-11-18 15:09:09 -06:00
def test_vg_reduce ( self ) :
# Test the case where we try to reduce a vg where the last PV has
# no metadata copies. In this case the reduce should fail.
vg_name = TestLvm . VG_P + ' reduce_test '
device_names = TestLvm . _get_pv_device_names ( )
for d in device_names :
lvm . pvRemove ( d )
lvm . pvCreate ( device_names [ 0 ] , 0 , 0 ) # Size all, pvmetadatacopies 0
lvm . pvCreate ( device_names [ 1 ] )
lvm . pvCreate ( device_names [ 2 ] )
lvm . pvCreate ( device_names [ 3 ] )
vg = lvm . vgCreate ( vg_name )
vg . extend ( device_names [ 3 ] )
vg . extend ( device_names [ 2 ] )
vg . extend ( device_names [ 1 ] )
vg . extend ( device_names [ 0 ] )
vg . close ( )
vg = None
vg = lvm . vgOpen ( vg_name , ' w ' )
vg . reduce ( device_names [ 3 ] )
vg . reduce ( device_names [ 2 ] )
self . assertRaises ( lvm . LibLVMError , vg . reduce , device_names [ 1 ] )
vg . close ( )
vg = None
vg = lvm . vgOpen ( vg_name , ' w ' )
vg . remove ( )
vg . close ( )
@staticmethod
def _test_valid_names ( method ) :
sample = ' azAZ09._-+ '
method ( ' x ' * 127 )
method ( ' .X ' )
method ( ' ..X ' )
for i in range ( 1 , 7 ) :
tests = ( ' ' . join ( i ) for i in itertools . product ( sample , repeat = i ) )
for t in tests :
if t == ' . ' or t == ' .. ' :
t + = ' X '
elif t . startswith ( ' - ' ) :
t = ' H ' + t
method ( t )
def _test_bad_names ( self , method , dupe_name ) :
2015-04-07 15:36:11 -05:00
# Test for duplicate name
2013-11-18 15:09:09 -06:00
self . assertRaises ( lvm . LibLVMError , method , dupe_name )
# Test for too long a name
self . assertRaises ( lvm . LibLVMError , method , ( ' x ' * 128 ) )
# Test empty
self . assertRaises ( lvm . LibLVMError , method , ' ' )
# Invalid characters
self . assertRaises ( lvm . LibLVMError , method , ' &invalid^char ' )
# Cannot start with .. and no following characters
self . assertRaises ( lvm . LibLVMError , method , ' .. ' )
# Cannot start with . and no following characters
self . assertRaises ( lvm . LibLVMError , method , ' . ' )
# Cannot start with a hyphen
self . assertRaises ( lvm . LibLVMError , method , ' -not_good ' )
def _lv_reserved_names ( self , method ) :
prefixes = [ ' snapshot ' , ' pvmove ' ]
2015-04-07 15:36:11 -05:00
reserved = [
' _mlog ' , ' _mimage ' , ' _pmspare ' , ' _rimage ' , ' _rmeta ' ,
' _vorigin ' , ' _tdata ' , ' _tmeta ' ]
2013-11-18 15:09:09 -06:00
for p in prefixes :
self . assertRaises ( lvm . LibLVMError , method , p + rs ( 3 ) )
for r in reserved :
self . assertRaises ( lvm . LibLVMError , method , rs ( 3 ) + r + rs ( 1 ) )
self . assertRaises ( lvm . LibLVMError , method , r + rs ( 1 ) )
def test_vg_lv_name_validate ( self ) :
lv_name = ' vg_lv_name_validate '
TestLvm . _create_thin_lv ( TestLvm . _get_pv_device_names ( ) , lv_name )
lv , vg = TestLvm . _get_lv ( None , lv_name )
self . _test_bad_names ( lvm . vgNameValidate , vg . getName ( ) )
self . _test_bad_names ( vg . lvNameValidate , lv . getName ( ) )
# Test good values
TestLvm . _test_valid_names ( lvm . vgNameValidate )
TestLvm . _test_valid_names ( vg . lvNameValidate )
self . _lv_reserved_names ( vg . lvNameValidate )
vg . close ( )
2012-10-26 15:35:17 -05:00
if __name__ == " __main__ " :
2013-06-18 15:13:27 -04:00
unittest . main ( )