#!/usr/bin/env python

# Copyright (C) 2012-2013 Red Hat, Inc. All rights reserved.
#
# This file is part of LVM2.
#
# This copyrighted material is made available to anyone wishing to use,
# modify, copy, or redistribute it subject to the terms and conditions
# of the GNU General Public License v.2.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA

import unittest
import random
import string
import lvm
import os
import itertools

# Set of basic unit tests for the python bindings.
#
# *** WARNING ***
#
# This test tries to only modify configuration for the list of allowed
# PVs, but an error in it could potentially cause data loss if run on a
# production system.  Therefore it is strongly advised that this unit test
# not be run on a system that contains data of value.

fh = None


def l(txt):
	if os.environ.get('PY_UNIT_LOG') is not None:
		global fh
		if fh is None:
			fh = open('/tmp/lvm_py_unit_test_' + rs(10), "a")
		fh.write(txt + "\n")
		fh.flush()


def rs(rand_len=10):
	"""
	Generate a random string
	"""
	return ''.join(random.choice(string.ascii_uppercase)
				   for x in range(rand_len))


def _get_allowed_devices():
	rc = os.environ.get('PY_UNIT_PVS')
	if rc is not None:
		rc = rc.splitlines()
		rc.sort()
	return rc


def compare_pv(right, left):
	r_name = right.getName()
	l_name = left.getName()

	if r_name > l_name:
		return 1
	elif r_name == l_name:
		return 0
	else:
		return -1


class AllowedPVS(object):
	"""
	We are only allowed to muck with certain PV, filter to only
	the ones we can use.
	"""

	def __init__(self):
		self.handle = None
		self.pvs_all = None

	def __enter__(self):
		rc = []

		allowed_dev = _get_allowed_devices()

		if allowed_dev:
			self.handle = lvm.listPvs()
			self.pvs_all = self.handle.open()

			for p in self.pvs_all:
				if p.getName() in allowed_dev:
					rc.append(p)

		#Sort them consistently
		rc.sort(compare_pv)
		return rc

	def __exit__(self, t_type, value, traceback):
		if self.handle:
			self.pvs_all = None
			self.handle.close()


class TestLvm(unittest.TestCase):

	VG_P = os.environ.get('PREFIX')

	@staticmethod
	def _get_pv_device_names():
		rc = []
		with AllowedPVS() as pvs:
			for p in pvs:
				rc.append(p.getName())
		return rc

	@staticmethod
	def _create_thick_lv(device_list, name):
		vg = lvm.vgCreate(TestLvm.VG_P + "_" + name)

		for d in device_list:
			vg.extend(d)

		vg.createLvLinear(name, vg.getSize() / 2)
		vg.close()
		vg = None

	@staticmethod
	def _create_thin_pool(device_list, pool_name):
		vg = lvm.vgCreate(TestLvm.VG_P + "_" + pool_name)

		for d in device_list:
			vg.extend(d)

		vg.createLvThinpool(pool_name, vg.getSize()/2, 0, 0,
				    lvm.THIN_DISCARDS_PASSDOWN, 1)
		return vg

	@staticmethod
	def _create_thin_lv(pv_devices, name):
		thin_pool_name = 'thin_vg_pool_' + rs(4)
		vg = TestLvm._create_thin_pool(pv_devices, thin_pool_name)
		vg.createLvThin(thin_pool_name, name, vg.getSize()/8)
		vg.close()
		vg = None

	@staticmethod
	def _vg_names():
		rc = []
		vg_names = lvm.listVgNames()

		for i in vg_names:
			if i[0:len(TestLvm.VG_P)] == TestLvm.VG_P:
				rc.append(i)

		return rc

	@staticmethod
	def _get_lv(lv_vol_type=None, lv_name=None):
		vg_name_list = TestLvm._vg_names()
		for vg_name in vg_name_list:
			vg = lvm.vgOpen(vg_name, "w")
			lvs = vg.listLVs()

			for lv in lvs:
				attr = lv.getAttr()
				if lv_vol_type or lv_name:
					if lv_vol_type is not None and attr[0] == lv_vol_type:
						return lv, vg
					elif lv_name is not None and lv_name == lv.getName():
						return lv, vg
				else:
					return lv, vg
			vg.close()
		return None, None

	@staticmethod
	def _remove_vg(vg_name):
		vg = lvm.vgOpen(vg_name, 'w')

		pvs = vg.listPVs()

		pe_devices = []

		#Remove old snapshots first, then lv
		for lv in vg.listLVs():
			attr = lv.getAttr()
			if attr[0] == 's':
				lv.remove()

		lvs = vg.listLVs()

		#Now remove any thin lVs
		for lv in vg.listLVs():
			attr = lv.getAttr()
			if attr[0] == 'V':
				lv.remove()

		#now remove the rest
		for lv in vg.listLVs():
			name = lv.getName()

			#Don't remove the hidden ones
			if '_tmeta' not in name and '_tdata' not in name:
				lv.remove()

		for p in pvs:
			pe_devices.append(p.getName())

		for pv in pe_devices[:-1]:
			vg.reduce(pv)

		vg.remove()
		vg.close()

	@staticmethod
	def _clean_up():
		#Clear out the testing PVs, but only if they contain stuff
		#this unit test created
		for vg_n in TestLvm._vg_names():
			TestLvm._remove_vg(vg_n)

		for d in TestLvm._get_pv_device_names():
			lvm.pvRemove(d)
			lvm.pvCreate(d)

	def setUp(self):
		device_list = TestLvm._get_pv_device_names()

		#Make sure we have an adequate number of PVs to use
		self.assertTrue(len(device_list) >= 4)
		TestLvm._clean_up()

	def tearDown(self):
		TestLvm._clean_up()

	def test_pv_resize(self):
		with AllowedPVS() as pvs:
			pv = pvs[0]
			curr_size = pv.getSize()
			dev_size = pv.getDevSize()
			self.assertTrue(curr_size == dev_size)
			pv.resize(curr_size/2)
		with AllowedPVS() as pvs:
			pv = pvs[0]
			resized_size = pv.getSize()
			self.assertTrue(resized_size != curr_size)
			pv.resize(dev_size)

	def test_pv_life_cycle(self):
		"""
		Test removing and re-creating a PV
		"""
		target_name = None

		with AllowedPVS() as pvs:
			pv = pvs[0]
			target_name = pv.getName()
			lvm.pvRemove(target_name)

		with AllowedPVS() as pvs:
			for p in pvs:
				self.assertTrue(p.getName() != target_name)

		lvm.pvCreate(target_name, 0)

		with AllowedPVS() as pvs:
			found = False
			for p in pvs:
				if p.getName() == target_name:
					found = True

		self.assertTrue(found)

	@staticmethod
	def test_pv_methods():
		with AllowedPVS() as pvs:
			for p in pvs:
				p.getName()
				p.getUuid()
				p.getMdaCount()
				p.getSize()
				p.getDevSize()
				p.getFree()
				p = None

	def test_version(self):
		version = lvm.getVersion()
		self.assertNotEquals(version, None)
		self.assertEquals(type(version), str)
		self.assertTrue(len(version) > 0)

	def test_pv_getters(self):
		with AllowedPVS() as pvs:
			pv = pvs[0]
			self.assertEqual(type(pv.getName()), str)
			self.assertTrue(len(pv.getName()) > 0)

			self.assertEqual(type(pv.getUuid()), str)
			self.assertTrue(len(pv.getUuid()) > 0)

			self.assertTrue(type(pv.getMdaCount()) == int or
							type(pv.getMdaCount()) == long)

			self.assertTrue(type(pv.getSize()) == int or
							type(pv.getSize()) == long)

			self.assertTrue(type(pv.getDevSize()) == int or
							type(pv.getSize()) == long)

			self.assertTrue(type(pv.getFree()) == int or
							type(pv.getFree()) == long)

	def _test_prop(self, prop_obj, prop, var_type, settable):
		result = prop_obj.getProperty(prop)

		self.assertEqual(type(result[0]), var_type)
		self.assertEqual(type(result[1]), bool)
		self.assertTrue(result[1] == settable)

	def test_pv_segs(self):
		with AllowedPVS() as pvs:
			pv = pvs[0]
			pv_segs = pv.listPVsegs()

			#LVsegs returns a tuple, (value, bool settable)
			#TODO: Test other properties of pv_seg
			for i in pv_segs:
				self._test_prop(i, 'pvseg_start', long, False)

	def test_pv_property(self):
		with AllowedPVS() as pvs:
			pv = pvs[0]
			self._test_prop(pv, 'pv_mda_count', long, False)

	def test_lv_property(self):
		lv_name = 'lv_test'
		TestLvm._create_thin_lv(TestLvm._get_pv_device_names(), lv_name)
		lv, vg = TestLvm._get_lv(None, lv_name)
		self._test_prop(lv, 'seg_count', long, False)
		vg.close()

	def test_lv_tags(self):
		lv_name = 'lv_test'
		TestLvm._create_thin_lv(TestLvm._get_pv_device_names(), lv_name)
		lv, vg = TestLvm._get_lv(None, lv_name)
		self._test_tags(lv)
		vg.close()

	def test_lv_active_inactive(self):
		lv_name = 'lv_test'
		TestLvm._create_thin_lv(TestLvm._get_pv_device_names(), lv_name)
		lv, vg = TestLvm._get_lv(None, lv_name)
		lv.deactivate()
		self.assertTrue(lv.isActive() is False)
		lv.activate()
		self.assertTrue(lv.isActive() is True)
		vg.close()

	def test_lv_rename(self):
		lv_name = 'lv_test'
		TestLvm._create_thin_lv(TestLvm._get_pv_device_names(), lv_name)
		lv, vg = TestLvm._get_lv(None, lv_name)

		current_name = lv.getName()
		new_name = rs()
		lv.rename(new_name)
		self.assertEqual(lv.getName(), new_name)
		lv.rename(current_name)
		vg.close()

	def test_lv_snapshot(self):

		thin_lv = 'thin_lv'
		thick_lv = 'thick_lv'

		device_names = TestLvm._get_pv_device_names()

		TestLvm._create_thin_lv(device_names[0:2], thin_lv)
		TestLvm._create_thick_lv(device_names[2:4], thick_lv)

		lv, vg = TestLvm._get_lv(None, thick_lv)
# FIXME		lv.snapshot('thick_snap_shot', 1024*1024)
		vg.close()

# FIXME		thick_ss, vg = TestLvm._get_lv(None, 'thick_snap_shot')
# FIXME		self.assertTrue(thick_ss is not None)
# FIXME		vg.close()

		thin_lv, vg = TestLvm._get_lv(None, thin_lv)
		thin_lv.snapshot('thin_snap_shot')
		vg.close()

		thin_ss, vg = TestLvm._get_lv(None, 'thin_snap_shot')
		self.assertTrue(thin_ss is not None)

		origin = thin_ss.getOrigin()
		self.assertTrue(thin_lv, origin)

		vg.close()

	def test_lv_suspend(self):
		lv_name = 'lv_test'
		TestLvm._create_thin_lv(TestLvm._get_pv_device_names(), lv_name)
		lv, vg = TestLvm._get_lv(None, lv_name)

		result = lv.isSuspended()
		self.assertTrue(type(result) == bool)
		vg.close()

	def test_lv_size(self):
		lv_name = 'lv_test'
		TestLvm._create_thin_lv(TestLvm._get_pv_device_names(), lv_name)
		lv, vg = TestLvm._get_lv(None, lv_name)

		result = lv.getSize()
		self.assertTrue(type(result) == int or type(result) == long)
		vg.close()

	def test_lv_resize(self):
		lv_name = 'lv_test'
		TestLvm._create_thin_lv(TestLvm._get_pv_device_names(), lv_name)
		lv, vg = TestLvm._get_lv(None, lv_name)

		curr_size = lv.getSize()
		lv.resize(curr_size+(1024*1024))
		latest = lv.getSize()
		self.assertTrue(curr_size != latest)

	def test_lv_seg(self):
		lv_name = 'lv_test'
		TestLvm._create_thin_lv(TestLvm._get_pv_device_names(), lv_name)
		lv, vg = TestLvm._get_lv(None, lv_name)

		lv_segs = lv.listLVsegs()

		#LVsegs returns a tuple, (value, bool settable)
		#TODO: Test other properties of lv_seg
		for i in lv_segs:
			self._test_prop(i, 'seg_start_pe', long, False)

		vg.close()

	def test_get_set_extend_size(self):
		thick_lv = 'get_set_prop'
		device_names = TestLvm._get_pv_device_names()
		TestLvm._create_thick_lv(device_names[0:2], thick_lv)
		lv, vg = TestLvm._get_lv(None, thick_lv)

		new_extent = 1024 * 1024 * 4

		self.assertFalse(vg.getExtentSize() != new_extent,
						 "Cannot determine if it works if they are the same")

		vg.setExtentSize(new_extent)
		self.assertEqual(vg.getExtentSize(), new_extent)
		vg.close()

	def test_vg_get_set_prop(self):
		thick_lv = 'get_set_prop'
		device_names = TestLvm._get_pv_device_names()
		TestLvm._create_thick_lv(device_names[0:2], thick_lv)
		lv, vg = TestLvm._get_lv(None, thick_lv)

		self.assertTrue(vg is not None)
		if vg:
			vg_mda_copies = vg.getProperty('vg_mda_copies')
			vg.setProperty('vg_mda_copies', vg_mda_copies[0])
			vg.close()

	def test_vg_remove_restore(self):
		#Store off the list of physical devices
		pv_devices = []

		thick_lv = 'get_set_prop'
		device_names = TestLvm._get_pv_device_names()
		TestLvm._create_thick_lv(device_names[0:2], thick_lv)
		lv, vg = TestLvm._get_lv(None, thick_lv)

		vg_name = vg.getName()

		pvs = vg.listPVs()
		for p in pvs:
			pv_devices.append(p.getName())
		vg.close()

		TestLvm._remove_vg(vg_name)
		self._create_thick_lv(pv_devices, thick_lv)

	def test_vg_names(self):
		vg = lvm.listVgNames()
		self.assertTrue(isinstance(vg, tuple))

	def test_dupe_lv_create(self):
		"""
		Try to create a lv with the same name expecting a failure
		Note: This was causing a seg. fault previously
		"""
		thick_lv = 'dupe_name'
		device_names = TestLvm._get_pv_device_names()
		TestLvm._create_thick_lv(device_names[0:2], thick_lv)
		lv, vg = TestLvm._get_lv(None, thick_lv)

		self.assertTrue(vg is not None)

		if vg:
			lvs = vg.listLVs()

			if len(lvs):
				lv = lvs[0]
				lv_name = lv.getName()
				self.assertRaises(lvm.LibLVMError, vg.createLvLinear, lv_name,
								  lv.getSize())
			vg.close()

	def test_vg_uuids(self):

		device_names = TestLvm._get_pv_device_names()
		TestLvm._create_thin_lv(device_names[0:2], 'thin')
		TestLvm._create_thick_lv(device_names[2:4], 'thick')

		vgs_uuids = lvm.listVgUuids()

		self.assertTrue(len(vgs_uuids) > 0)
		self.assertTrue(isinstance(vgs_uuids, tuple))

		vgs_uuids = list(vgs_uuids)
		vgs_names = lvm.listVgNames()

		for vg_name in vgs_names:
			vg = lvm.vgOpen(vg_name, "r")

			#TODO Write/fix BUG, vg uuid don't match between
			#lvm.listVgUuids and vg.getUuid()
			vg_uuid_search = vg.getUuid().replace('-', '')

			self.assertTrue(vg_uuid_search in vgs_uuids)
			vgs_uuids.remove(vg_uuid_search)
			vg.close()

		self.assertTrue(len(vgs_uuids) == 0)

	def test_pv_lookup_from_vg(self):
		device_names = TestLvm._get_pv_device_names()
		TestLvm._create_thin_lv(device_names[0:2], 'thin')
		TestLvm._create_thick_lv(device_names[2:4], 'thick')

		vg_names = TestLvm._vg_names()

		self.assertTrue(len(vg_names) > 0)

		for vg_name in vg_names:
			vg = lvm.vgOpen(vg_name, 'w')
			pvs = vg.listPVs()

			for p in pvs:
				name = p.getName()
				uuid = p.getUuid()

				pv_name_lookup = vg.pvFromName(name)
				pv_uuid_lookup = vg.pvFromUuid(uuid)

				self.assertTrue(pv_name_lookup.getName() ==
								pv_uuid_lookup.getName())
				self.assertTrue(pv_name_lookup.getUuid() ==
								pv_uuid_lookup.getUuid())

				self.assertTrue(name == pv_name_lookup.getName())
				self.assertTrue(uuid == pv_uuid_lookup.getUuid())

				pv_name_lookup = None
				pv_uuid_lookup = None
				p = None

			pvs = None
			vg.close()

	def test_percent_to_float(self):
		self.assertEqual(lvm.percentToFloat(0), 0.0)
		self.assertEqual(lvm.percentToFloat(1000000), 1.0)
		self.assertEqual(lvm.percentToFloat(1000000 / 2), 0.5)

	def test_scan(self):
		self.assertEqual(lvm.scan(), None)

	def test_config_reload(self):
		self.assertEqual(lvm.configReload(), None)

	def test_config_override(self):
		self.assertEquals(lvm.configOverride("global.test = 1"), None)

	def test_config_find_bool(self):
		either_or = lvm.configFindBool("global/fallback_to_local_locking")
		self.assertTrue(type(either_or) == bool)
		self.assertTrue(lvm.configFindBool("global/locking_type"))

	def test_vg_from_pv_lookups(self):
		device_names = TestLvm._get_pv_device_names()
		TestLvm._create_thin_lv(device_names[0:2], 'thin')
		TestLvm._create_thick_lv(device_names[2:4], 'thick')

		vgname_list = TestLvm._vg_names()

		self.assertTrue(len(vgname_list) > 0)

		for vg_name in vgname_list:
			vg = lvm.vgOpen(vg_name, 'r')

			vg_name = vg.getName()

			pv_list = vg.listPVs()
			for pv in pv_list:
				vg_name_from_pv = lvm.vgNameFromPvid(pv.getUuid())
				self.assertEquals(vg_name, vg_name_from_pv)
				self.assertEqual(vg_name, lvm.vgNameFromDevice(pv.getName()))
			vg.close()

	def test_vg_get_name(self):
		device_names = TestLvm._get_pv_device_names()
		TestLvm._create_thin_lv(device_names[0:2], 'thin')
		TestLvm._create_thick_lv(device_names[2:4], 'thick')

		vgname_list = TestLvm._vg_names()

		self.assertTrue(len(vgname_list) > 0)

		for vg_name in vgname_list:
			vg = lvm.vgOpen(vg_name, 'r')
			self.assertEqual(vg.getName(), vg_name)
			vg.close()

	def test_vg_get_uuid(self):
		device_names = TestLvm._get_pv_device_names()
		TestLvm._create_thin_lv(device_names[0:2], 'thin')
		TestLvm._create_thick_lv(device_names[2:4], 'thick')

		vgname_list = TestLvm._vg_names()

		self.assertTrue(len(vgname_list) > 0)

		for vg_name in vgname_list:
			vg = lvm.vgOpen(vg_name, 'r')
			uuid = vg.getUuid()
			self.assertNotEqual(uuid, None)
			self.assertTrue(len(uuid) > 0)
			vg.close()

	RETURN_NUMERIC = ["getSeqno", "getSize", "getFreeSize", "getFreeSize",
					  "getExtentSize", "getExtentCount", "getFreeExtentCount",
					  "getPvCount", "getMaxPv", "getMaxLv"]

	def test_vg_getters(self):
		device_names = TestLvm._get_pv_device_names()
		TestLvm._create_thin_lv(device_names[0:2], 'thin')
		TestLvm._create_thick_lv(device_names[2:4], 'thick')

		vg_name_list = TestLvm._vg_names()

		self.assertTrue(len(vg_name_list) > 0)

		for vg_name in vg_name_list:
			vg = lvm.vgOpen(vg_name, 'r')
			self.assertTrue(type(vg.isClustered()) == bool)
			self.assertTrue(type(vg.isExported()) == bool)
			self.assertTrue(type(vg.isPartial()) == bool)

			#Loop through the list invoking the method
			for method_name in TestLvm.RETURN_NUMERIC:
				method = getattr(vg, method_name)
				result = method()
				self.assertTrue(type(result) == int or type(result) == long)

			vg.close()

	def _test_tags(self, tag_obj):
		existing_tags = tag_obj.getTags()
		self.assertTrue(type(existing_tags) == tuple)

		num_tags = random.randint(2, 40)
		created_tags = []

		for i in range(num_tags):
			tag_name = rs(random.randint(1, 128))
			tag_obj.addTag(tag_name)
			created_tags.append(tag_name)

		tags = tag_obj.getTags()
		self.assertTrue(len(existing_tags) + len(created_tags) == len(tags))

		num_remove = len(created_tags)

		for i in range(num_remove):
			tag_to_remove = created_tags[
				random.randint(0, len(created_tags) - 1)]

			created_tags.remove(tag_to_remove)

			tag_obj.removeTag(tag_to_remove)

			current_tags = tag_obj.getTags()
			self.assertFalse(tag_to_remove in current_tags)

		current_tags = tag_obj.getTags()
		self.assertTrue(len(current_tags) == len(existing_tags))
		for e in existing_tags:
			self.assertTrue(e in current_tags)

	def test_vg_tags(self):
		device_names = TestLvm._get_pv_device_names()

		i = 0
		for d in device_names:
			if i % 2 == 0:
				TestLvm._create_thin_lv([d],  "thin_lv%d" % i)
			else:
				TestLvm._create_thick_lv([d], "thick_lv%d" % i)
			i += 1

		for vg_name in TestLvm._vg_names():
			vg = lvm.vgOpen(vg_name, 'w')
			self._test_tags(vg)
			vg.close()

	@staticmethod
	def test_listing():

		env = os.environ

		for k, v in env.items():
			l("%s:%s" % (k, v))

		with lvm.listPvs() as pvs:
			for p in pvs:
				l('pv= %s' % p.getName())

		l('Checking for VG')
		for v in lvm.listVgNames():
			l('vg= %s' % v)

	def test_pv_empty_listing(self):
		#We had a bug where we would seg. fault if we had no PVs.

		l('testPVemptylisting entry')

		device_names = TestLvm._get_pv_device_names()

		for d in device_names:
			l("Removing %s" % d)
			lvm.pvRemove(d)

		count = 0

		with lvm.listPvs() as pvs:
			for p in pvs:
				count += 1
				l('pv= %s' % p.getName())

		self.assertTrue(count == 0)

		for d in device_names:
			lvm.pvCreate(d)

	def test_pv_create(self):
		size = [0, 1024*1024*4]
		pvmeta_copies = [0, 1, 2]
		pvmeta_size = [0, 255, 512, 1024]
		data_alignment = [0, 2048, 4096]
		data_alignment_offset = [1, 1, 1]
		zero = [0, 1]

		device_names = TestLvm._get_pv_device_names()

		for d in device_names:
			lvm.pvRemove(d)

		d = device_names[0]

		#Test some error cases
		self.assertRaises(TypeError, lvm.pvCreate, None)
		self.assertRaises(lvm.LibLVMError, lvm.pvCreate, '')
		self.assertRaises(lvm.LibLVMError, lvm.pvCreate, d, 4)
		self.assertRaises(lvm.LibLVMError, lvm.pvCreate, d, 0, 4)
		self.assertRaises(lvm.LibLVMError, lvm.pvCreate, d, 0, 0, 0, 2**34)
		self.assertRaises(lvm.LibLVMError, lvm.pvCreate, d, 0, 0, 0, 4096,
						  2**34)

		#Try a number of combinations and permutations
		for s in size:
			lvm.pvCreate(d, s)
			lvm.pvRemove(d)
			for copies in pvmeta_copies:
				lvm.pvCreate(d, s, copies)
				lvm.pvRemove(d)
				for pv_size in pvmeta_size:
					lvm.pvCreate(d, s, copies, pv_size)
					lvm.pvRemove(d)
					for align in data_alignment:
						lvm.pvCreate(d, s, copies, pv_size, align)
						lvm.pvRemove(d)
						for align_offset in data_alignment_offset:
							lvm.pvCreate(d, s, copies, pv_size, align,
										 align * align_offset)
							lvm.pvRemove(d)
							for z in zero:
								lvm.pvCreate(d, s, copies, pv_size, align,
											 align * align_offset, z)
								lvm.pvRemove(d)

		#Restore
		for d in device_names:
			lvm.pvCreate(d)

	def test_vg_reduce(self):
		# Test the case where we try to reduce a vg where the last PV has
		# no metadata copies.  In this case the reduce should fail.
		vg_name = TestLvm.VG_P + 'reduce_test'

		device_names = TestLvm._get_pv_device_names()

		for d in device_names:
			lvm.pvRemove(d)

		lvm.pvCreate(device_names[0], 0, 0)  # Size all, pvmetadatacopies 0
		lvm.pvCreate(device_names[1])
		lvm.pvCreate(device_names[2])
		lvm.pvCreate(device_names[3])

		vg = lvm.vgCreate(vg_name)

		vg.extend(device_names[3])
		vg.extend(device_names[2])
		vg.extend(device_names[1])
		vg.extend(device_names[0])
		vg.close()

		vg = None

		vg = lvm.vgOpen(vg_name, 'w')

		vg.reduce(device_names[3])
		vg.reduce(device_names[2])

		self.assertRaises(lvm.LibLVMError, vg.reduce, device_names[1])

		vg.close()
		vg = None

		vg = lvm.vgOpen(vg_name, 'w')
		vg.remove()
		vg.close()

	@staticmethod
	def _test_valid_names(method):
		sample = 'azAZ09._-+'

		method('x' * 127)
		method('.X')
		method('..X')

		for i in range(1, 7):
			tests = (''.join(i) for i in itertools.product(sample, repeat=i))
			for t in tests:
				if t == '.' or t == '..':
					t += 'X'
				elif t.startswith('-'):
					t = 'H' + t
				method(t)

	def _test_bad_names(self, method, dupe_name):
		 # Test for duplicate name
		self.assertRaises(lvm.LibLVMError, method, dupe_name)

		# Test for too long a name
		self.assertRaises(lvm.LibLVMError, method, ('x' * 128))

		# Test empty
		self.assertRaises(lvm.LibLVMError, method, '')

		# Invalid characters
		self.assertRaises(lvm.LibLVMError, method, '&invalid^char')

		# Cannot start with .. and no following characters
		self.assertRaises(lvm.LibLVMError, method, '..')

		# Cannot start with . and no following characters
		self.assertRaises(lvm.LibLVMError, method, '.')

		# Cannot start with a hyphen
		self.assertRaises(lvm.LibLVMError, method, '-not_good')

	def _lv_reserved_names(self, method):
		prefixes = ['snapshot', 'pvmove']
		reserved = ['_mlog', '_mimage', '_pmspare', '_rimage', '_rmeta',
					'_vorigin', '_tdata', '_tmeta']

		for p in prefixes:
			self.assertRaises(lvm.LibLVMError, method, p + rs(3))

		for r in reserved:
			self.assertRaises(lvm.LibLVMError, method, rs(3) + r + rs(1))
			self.assertRaises(lvm.LibLVMError, method, r + rs(1))

	def test_vg_lv_name_validate(self):
		lv_name = 'vg_lv_name_validate'
		TestLvm._create_thin_lv(TestLvm._get_pv_device_names(), lv_name)
		lv, vg = TestLvm._get_lv(None, lv_name)

		self._test_bad_names(lvm.vgNameValidate, vg.getName())
		self._test_bad_names(vg.lvNameValidate, lv.getName())

		# Test good values
		TestLvm._test_valid_names(lvm.vgNameValidate)
		TestLvm._test_valid_names(vg.lvNameValidate)
		self._lv_reserved_names(vg.lvNameValidate)

		vg.close()

if __name__ == "__main__":
	unittest.main()