1
0
mirror of https://github.com/samba-team/samba.git synced 2024-12-25 23:21:54 +03:00

s4/fsmo: Extended fsmo test with infrastructure, pdc and rid roles

This commit is contained in:
Anatoliy Atanasov 2010-09-14 18:07:09 +03:00
parent 2eeba94c9c
commit ccb7fdc52b

View File

@ -49,6 +49,8 @@ class DrsFsmoTestCase(samba.tests.TestCase):
def setUp(self): def setUp(self):
super(DrsFsmoTestCase, self).setUp() super(DrsFsmoTestCase, self).setUp()
# we have to wait for the replication before we make the check
self.sleep_time = 5
# connect to DCs singleton # connect to DCs singleton
if self.ldb_dc1 is None: if self.ldb_dc1 is None:
DrsFsmoTestCase.dc1 = get_env_var("DC1") DrsFsmoTestCase.dc1 = get_env_var("DC1")
@ -71,8 +73,13 @@ class DrsFsmoTestCase(samba.tests.TestCase):
# cache some of RootDSE props # cache some of RootDSE props
self.schema_dn = self.info_dc1["schemaNamingContext"][0] self.schema_dn = self.info_dc1["schemaNamingContext"][0]
self.domain_dn = self.info_dc1["defaultNamingContext"][0]
self.config_dn = self.info_dc1["configurationNamingContext"][0]
self.dsServiceName_dc1 = self.info_dc1["dsServiceName"][0] self.dsServiceName_dc1 = self.info_dc1["dsServiceName"][0]
self.dsServiceName_dc2 = self.info_dc2["dsServiceName"][0] self.dsServiceName_dc2 = self.info_dc2["dsServiceName"][0]
self.infrastructure_dn = "CN=Infrastructure," + self.domain_dn
self.naming_dn = "CN=Partitions," + self.config_dn
self.rid_dn = "CN=RID Manager$,CN=System," + self.domain_dn
# we will need DCs DNS names for 'net fsmo' command # we will need DCs DNS names for 'net fsmo' command
self.dnsname_dc1 = self.info_dc1["dnsHostName"][0] self.dnsname_dc1 = self.info_dc1["dnsHostName"][0]
@ -82,50 +89,66 @@ class DrsFsmoTestCase(samba.tests.TestCase):
def tearDown(self): def tearDown(self):
super(DrsFsmoTestCase, self).tearDown() super(DrsFsmoTestCase, self).tearDown()
def _net_fsmo_schema_transfer(self, DC): def _net_fsmo_role_transfer(self, DC, role):
# find out where is net command # find out where is net command
net_cmd = os.path.abspath("./bin/net") net_cmd = os.path.abspath("./bin/net")
# make command line credentials string # make command line credentials string
creds = samba.tests.cmdline_credentials creds = samba.tests.cmdline_credentials
cmd_line_auth = "-U%s/%s%%%s" % (creds.get_domain(), cmd_line_auth = "-U%s/%s%%%s" % (creds.get_domain(),
creds.get_username(), creds.get_password()) creds.get_username(), creds.get_password())
# bin/net fsmo transfer --role=schema --host=ldap://<Dest_DC_NAME>:389 # bin/net fsmo transfer --role=role --host=ldap://DC:389
cmd_line = "%s fsmo transfer --role=schema --host=ldap://%s:389 %s" % (net_cmd, DC, cmd_line = "%s fsmo transfer --role=%s --host=ldap://%s:389 %s" % (net_cmd, role, DC,
cmd_line_auth) cmd_line_auth)
ret = os.system(cmd_line) ret = os.system(cmd_line)
self.assertEquals(ret, 0, "Transfering schema to %s has failed!" % (DC)) self.assertEquals(ret, 0, "Transfering schema to %s has failed!" % (DC))
pass pass
def _role_transfer(self, role, role_dn):
def test_SchemaMasterTransfer(self): """Triggers transfer of role from DC1 to DC2
"""Triggers schema master transfer role from DC1 to DC2
and vice versa so the role goes back to the original dc""" and vice versa so the role goes back to the original dc"""
# dc2 gets the schema master role from dc1 # dc2 gets the schema master role from dc1
print "Testing for role transfer from %s to %s" % (self.dnsname_dc1, self.dnsname_dc2) print "Testing for %s role transfer from %s to %s" % (role, self.dnsname_dc1, self.dnsname_dc2)
self._net_fsmo_schema_transfer(DC=self.dnsname_dc2) self._net_fsmo_role_transfer(DC=self.dnsname_dc2, role=role)
# check if the role is transfered, but wait a little first so the getncchanges can pass # check if the role is transfered, but wait a little first so the getncchanges can pass
time.sleep(20) time.sleep(self.sleep_time)
res = self.ldb_dc2.search(self.schema_dn, res = self.ldb_dc2.search(role_dn,
scope=SCOPE_BASE, attrs=["fSMORoleOwner"]) scope=SCOPE_BASE, attrs=["fSMORoleOwner"])
assert len(res) == 1 assert len(res) == 1
self.schemaMaster = res[0]["fSMORoleOwner"][0] self.master = res[0]["fSMORoleOwner"][0]
self.assertEquals(self.schemaMaster, self.dsServiceName_dc2, self.assertEquals(self.master, self.dsServiceName_dc2,
"Transfering schema to %s has failed, schema master is: %s!"%(self.dsServiceName_dc2,self.schemaMaster)) "Transfering %s role to %s has failed, master is: %s!"%(role, self.dsServiceName_dc2,self.master))
# dc1 gets back the schema master role from dc2 # dc1 gets back the schema master role from dc2
print "Testing for role transfer from %s to %s" % (self.dnsname_dc2, self.dnsname_dc1) print "Testing for %s role transfer from %s to %s" % (role, self.dnsname_dc2, self.dnsname_dc1)
self._net_fsmo_schema_transfer(DC=self.dnsname_dc1) self._net_fsmo_role_transfer(DC=self.dnsname_dc1, role=role);
# check if the role is transfered # check if the role is transfered
time.sleep(20) time.sleep(self.sleep_time)
res = self.ldb_dc1.search(self.schema_dn, res = self.ldb_dc1.search(role_dn,
scope=SCOPE_BASE, attrs=["fSMORoleOwner"]) scope=SCOPE_BASE, attrs=["fSMORoleOwner"])
assert len(res) == 1 assert len(res) == 1
self.schemaMaster = res[0]["fSMORoleOwner"][0] self.master = res[0]["fSMORoleOwner"][0]
self.assertEquals(self.schemaMaster, self.dsServiceName_dc1, self.assertEquals(self.master, self.dsServiceName_dc1,
"Transfering schema to %s has failed, schema master is %s"%(self.dsServiceName_dc1, self.schemaMaster)) "Transfering %s role to %s has failed, master is %s"%(role, self.dsServiceName_dc1, self.master))
pass pass
def test_SchemaMasterTransfer(self):
self._role_transfer(role="schema", role_dn=self.schema_dn)
pass
def test_InfrastructureMasterTransfer(self):
self._role_transfer(role="infrastructure", role_dn=self.infrastructure_dn)
pass
def test_PDCMasterTransfer(self):
self._role_transfer(role="pdc", role_dn=self.domain_dn)
pass
def test_RIDMasterTransfer(self):
self._role_transfer(role="rid", role_dn=self.rid_dn)
pass
######################################################################################## ########################################################################################
def get_env_var(var_name): def get_env_var(var_name):
if not var_name in os.environ.keys(): if not var_name in os.environ.keys():