mirror of
https://github.com/samba-team/samba.git
synced 2025-07-16 20:59:12 +03:00
Formatting cleanups; use True/False for booleans, unnecessary backslashes, spacing.
This commit is contained in:
@ -81,6 +81,7 @@ def find_setup_dir():
|
||||
# hard coded at this point, but will probably be changed when
|
||||
# we enable different fsmo roles
|
||||
|
||||
|
||||
def get_config_descriptor(domain_sid):
|
||||
sddl = "O:EAG:EAD:(OA;;CR;1131f6aa-9c07-11d1-f79f-00c04fc2dcd2;;ED)" \
|
||||
"(OA;;CR;1131f6ab-9c07-11d1-f79f-00c04fc2dcd2;;ED)" \
|
||||
@ -192,8 +193,10 @@ class ProvisionNames(object):
|
||||
self.sitename = None
|
||||
self.smbconf = None
|
||||
|
||||
def updateProvisionUSN(samdb, low, high, replace = 0):
|
||||
|
||||
def update_provision_usn(samdb, low, high, replace=False):
|
||||
"""Update the field provisionUSN in sam.ldb
|
||||
|
||||
This field is used to track range of USN modified by provision and
|
||||
upgradeprovision.
|
||||
This value is used afterward by next provision to figure out if
|
||||
@ -203,26 +206,28 @@ def updateProvisionUSN(samdb, low, high, replace = 0):
|
||||
:param low: The lowest USN modified by this upgrade
|
||||
:param high: The highest USN modified by this upgrade
|
||||
:param replace: A boolean indicating if the range should replace any
|
||||
existing one or appended (default)"""
|
||||
existing one or appended (default)
|
||||
"""
|
||||
|
||||
tab = []
|
||||
if not replace:
|
||||
entry = samdb.search(expression="(&(dn=@PROVISION)(%s=*))" % \
|
||||
LAST_PROVISION_USN_ATTRIBUTE, base="",
|
||||
scope=ldb.SCOPE_SUBTREE,
|
||||
attrs=[LAST_PROVISION_USN_ATTRIBUTE,"dn"])
|
||||
attrs=[LAST_PROVISION_USN_ATTRIBUTE, "dn"])
|
||||
for e in entry[0][LAST_PROVISION_USN_ATTRIBUTE]:
|
||||
tab.append(str(e))
|
||||
|
||||
tab.append("%s-%s"%(str(low), str(high)))
|
||||
tab.append("%s-%s" % (low, high))
|
||||
delta = ldb.Message()
|
||||
delta.dn = ldb.Dn(samdb,"@PROVISION")
|
||||
delta.dn = ldb.Dn(samdb, "@PROVISION")
|
||||
delta[LAST_PROVISION_USN_ATTRIBUTE] = ldb.MessageElement(tab,
|
||||
ldb.FLAG_MOD_REPLACE,
|
||||
LAST_PROVISION_USN_ATTRIBUTE)
|
||||
samdb.modify(delta)
|
||||
|
||||
def setProvisionUSN(samdb, low, high):
|
||||
|
||||
def set_provision_usn(samdb, low, high):
|
||||
"""Set the field provisionUSN in sam.ldb
|
||||
This field is used to track range of USN modified by provision and
|
||||
upgradeprovision.
|
||||
@ -233,14 +238,15 @@ def setProvisionUSN(samdb, low, high):
|
||||
:param low: The lowest USN modified by this upgrade
|
||||
:param high: The highest USN modified by this upgrade"""
|
||||
tab = []
|
||||
tab.append("%s-%s"%(str(low), str(high)))
|
||||
tab.append("%s-%s" % (low, high))
|
||||
delta = ldb.Message()
|
||||
delta.dn = ldb.Dn(samdb,"@PROVISION")
|
||||
delta.dn = ldb.Dn(samdb, "@PROVISION")
|
||||
delta[LAST_PROVISION_USN_ATTRIBUTE] = ldb.MessageElement(tab,
|
||||
ldb.FLAG_MOD_ADD,
|
||||
LAST_PROVISION_USN_ATTRIBUTE)
|
||||
samdb.add(delta)
|
||||
|
||||
|
||||
def get_max_usn(samdb,basedn):
|
||||
""" This function return the biggest USN present in the provision
|
||||
|
||||
@ -256,7 +262,7 @@ def get_max_usn(samdb,basedn):
|
||||
"paged_results:1:1"])
|
||||
return res[0]["uSNChanged"]
|
||||
|
||||
def getLastProvisionUSN(sam):
|
||||
def get_last_provision_usn(sam):
|
||||
"""Get the lastest USN modified by a provision or an upgradeprovision
|
||||
|
||||
:param sam: An LDB object pointing to the sam.ldb
|
||||
@ -541,7 +547,7 @@ def make_smbconf(smbconf, setup_path, hostname, domain, realm, serverrole,
|
||||
privdir = os.path.join(targetdir, "private")
|
||||
else:
|
||||
privdir = default_lp.get("private dir")
|
||||
posixeadb_line = "posix:eadb = " + os.path.abspath(os.path.join(privdir,"eadb.tdb"))
|
||||
posixeadb_line = "posix:eadb = " + os.path.abspath(os.path.join(privdir, "eadb.tdb"))
|
||||
else:
|
||||
posixeadb_line = ""
|
||||
|
||||
@ -1159,7 +1165,7 @@ def set_gpo_acl(sysvol, dnsdomain, domainsid, domaindn, samdb, lp):
|
||||
set_dir_acl(policy_path,dsacl2fsacl(POLICIES_ACL, str(domainsid)),
|
||||
lp, str(domainsid))
|
||||
res = samdb.search(base="CN=Policies,CN=System,%s"%(domaindn),
|
||||
attrs=["cn","nTSecurityDescriptor"],
|
||||
attrs=["cn", "nTSecurityDescriptor"],
|
||||
expression="", scope=ldb.SCOPE_ONELEVEL)
|
||||
for policy in res:
|
||||
acl = ndr_unpack(security.descriptor,
|
||||
@ -1322,8 +1328,8 @@ def provision(setup_dir, logger, session_info,
|
||||
|
||||
if not os.path.exists(paths.private_dir):
|
||||
os.mkdir(paths.private_dir)
|
||||
if not os.path.exists(os.path.join(paths.private_dir,"tls")):
|
||||
os.mkdir(os.path.join(paths.private_dir,"tls"))
|
||||
if not os.path.exists(os.path.join(paths.private_dir, "tls")):
|
||||
os.mkdir(os.path.join(paths.private_dir, "tls"))
|
||||
|
||||
ldapi_url = "ldapi://%s" % urllib.quote(paths.s4_ldapi_path, safe="")
|
||||
|
||||
@ -1489,12 +1495,12 @@ def provision(setup_dir, logger, session_info,
|
||||
logger.info("A Kerberos configuration suitable for Samba 4 has been "
|
||||
"generated at %s", paths.krb5conf)
|
||||
|
||||
lastProvisionUSNs = getLastProvisionUSN(samdb)
|
||||
lastProvisionUSNs = get_last_provision_usn(samdb)
|
||||
maxUSN = get_max_usn(samdb, str(names.rootdn))
|
||||
if lastProvisionUSNs != None:
|
||||
updateProvisionUSN(samdb, 0, maxUSN, 1)
|
||||
update_provision_usn(samdb, 0, maxUSN, 1)
|
||||
else:
|
||||
setProvisionUSN(samdb, 0, maxUSN)
|
||||
set_provision_usn(samdb, 0, maxUSN)
|
||||
|
||||
if serverrole == "domain controller":
|
||||
create_dns_update_list(lp, logger, paths, setup_path)
|
||||
@ -1545,7 +1551,6 @@ def provision(setup_dir, logger, session_info,
|
||||
logger.info("This slapd-Commandline is also stored under: %s/ldap_backend_startup.sh",
|
||||
provision_backend.ldapdir)
|
||||
|
||||
|
||||
result = ProvisionResult()
|
||||
result.domaindn = domaindn
|
||||
result.paths = paths
|
||||
@ -1708,6 +1713,7 @@ def create_named_conf(paths, setup_path, realm, dnsdomain,
|
||||
|
||||
setup_file(setup_path("named.conf.update"), paths.namedconf_update)
|
||||
|
||||
|
||||
def create_named_txt(path, setup_path, realm, dnsdomain,
|
||||
private_dir, keytab_name):
|
||||
"""Write out a file containing zone statements suitable for inclusion in a
|
||||
@ -1729,6 +1735,7 @@ def create_named_txt(path, setup_path, realm, dnsdomain,
|
||||
"PRIVATE_DIR": private_dir
|
||||
})
|
||||
|
||||
|
||||
def create_krb5_conf(path, setup_path, dnsdomain, hostname, realm):
|
||||
"""Write out a file containing zone statements suitable for inclusion in a
|
||||
named.conf file (including GSS-TSIG configuration).
|
||||
|
@ -40,9 +40,11 @@ def create_dummy_secretsdb(path, lp=None):
|
||||
secrets_ldb.transaction_commit()
|
||||
return secrets_ldb
|
||||
|
||||
|
||||
class ProvisionTestCase(samba.tests.TestCaseInTempDir):
|
||||
"""Some simple tests for individual functions in the provisioning code.
|
||||
"""
|
||||
|
||||
def test_setup_secretsdb(self):
|
||||
path = os.path.join(self.tempdir, "secrets.ldb")
|
||||
ldb = setup_secretsdb(path, setup_path, None, None, lp=env_loadparm())
|
||||
|
@ -23,16 +23,13 @@ from samba.upgradehelpers import (usn_in_range, dn_sort,
|
||||
construct_existor_expr)
|
||||
|
||||
from samba.tests.provision import create_dummy_secretsdb
|
||||
from samba.tests import env_loadparm, TestCaseInTempDir
|
||||
from samba.tests import TestCaseInTempDir
|
||||
from samba import Ldb
|
||||
from ldb import SCOPE_SUBTREE
|
||||
import samba.tests
|
||||
|
||||
lp = env_loadparm()
|
||||
|
||||
def dummymessage(a=None, b=None):
|
||||
if 0:
|
||||
print "none"
|
||||
pass
|
||||
|
||||
|
||||
class UpgradeProvisionTestCase(TestCaseInTempDir):
|
||||
@ -60,7 +57,8 @@ class UpgradeProvisionTestCase(TestCaseInTempDir):
|
||||
self.assertEquals(dn_sort("dc=toto,dc=tata",
|
||||
"cn=foo,dc=toto,dc=tata"), -1)
|
||||
self.assertEquals(dn_sort("cn=bar, dc=toto,dc=tata",
|
||||
"cn=foo, dc=toto,dc=tata"),-1)
|
||||
"cn=foo, dc=toto,dc=tata"), -1)
|
||||
|
||||
def test_get_diff_sddl(self):
|
||||
sddl = "O:SAG:DUD:AI(A;CIID;RPWPCRCCLCLORCWOWDSW;;;SA)\
|
||||
(A;CIID;RP LCLORC;;;AU)(A;CIID;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)S:AI(AU;CIIDSA;WP;;;WD)"
|
||||
@ -75,19 +73,19 @@ class UpgradeProvisionTestCase(TestCaseInTempDir):
|
||||
sddl5 = "O:SAG:DUD:AI(A;CIID;RPWPCRCCLCLORCWOWDSW;;;SA)\
|
||||
(A;CIID;RP LCLORC;;;AU)(A;CIID;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)"
|
||||
|
||||
self.assertEquals(get_diff_sddls(sddl, sddl1) ,"")
|
||||
self.assertEquals(get_diff_sddls(sddl, sddl1), "")
|
||||
txt = get_diff_sddls(sddl, sddl2)
|
||||
self.assertEquals(txt ,"\tOwner mismatch: SA (in ref) BA(in current)\n")
|
||||
self.assertEquals(txt, "\tOwner mismatch: SA (in ref) BA(in current)\n")
|
||||
txt = get_diff_sddls(sddl, sddl3)
|
||||
self.assertEquals(txt ,"\tGroup mismatch: DU (in ref) BA(in current)\n")
|
||||
self.assertEquals(txt, "\tGroup mismatch: DU (in ref) BA(in current)\n")
|
||||
txt = get_diff_sddls(sddl, sddl4)
|
||||
txtmsg = "\tPart dacl is different between reference and current here\
|
||||
is the detail:\n\t\t(A;CIID;RPWPCRCCLCLORCWOWDSW;;;BA) ACE is not present in\
|
||||
the reference\n\t\t(A;CIID;RPWPCRCCLCLORCWOWDSW;;;SA) ACE is not present in\
|
||||
the current\n"
|
||||
self.assertEquals(txt , txtmsg)
|
||||
self.assertEquals(txt, txtmsg)
|
||||
txt = get_diff_sddls(sddl, sddl5)
|
||||
self.assertEquals(txt ,"\tCurrent ACL hasn't a sacl part\n")
|
||||
self.assertEquals(txt, "\tCurrent ACL hasn't a sacl part\n")
|
||||
|
||||
def test_construct_existor_expr(self):
|
||||
res = construct_existor_expr([])
|
||||
@ -99,7 +97,9 @@ class UpgradeProvisionTestCase(TestCaseInTempDir):
|
||||
res = construct_existor_expr(["foo", "bar"])
|
||||
self.assertEquals(res, "(|(foo=*)(bar=*))")
|
||||
|
||||
|
||||
class UpdateSecretsTests(samba.tests.TestCaseInTempDir):
|
||||
|
||||
def setUp(self):
|
||||
super(UpdateSecretsTests, self).setUp()
|
||||
self.referencedb = create_dummy_secretsdb(
|
||||
|
@ -20,6 +20,8 @@
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
|
||||
from samba import param
|
||||
from samba.credentials import Credentials
|
||||
from samba.auth import system_session
|
||||
from samba.provision import getpolicypath
|
||||
@ -27,67 +29,66 @@ from samba.upgradehelpers import (get_paths, get_ldbs,
|
||||
find_provision_key_parameters, identic_rename,
|
||||
updateOEMInfo, getOEMInfo, update_gpo,
|
||||
delta_update_basesamdb,search_constructed_attrs_stored)
|
||||
|
||||
from samba.tests.provision import create_dummy_secretsdb
|
||||
from samba import param
|
||||
from samba.tests import env_loadparm, TestCaseInTempDir
|
||||
from samba.tests.provision import create_dummy_secretsdb
|
||||
import ldb
|
||||
|
||||
|
||||
def dummymessage(a=None, b=None):
|
||||
if 0:
|
||||
print "none"
|
||||
pass
|
||||
|
||||
lp = env_loadparm()
|
||||
smbConfPath = "%s/%s/%s" % (os.environ["SELFTEST_PREFIX"], "dc", "etc/smb.conf")
|
||||
smb_conf_path = "%s/%s/%s" % (os.environ["SELFTEST_PREFIX"], "dc", "etc/smb.conf")
|
||||
|
||||
class UpgradeProvisionBasicLdbHelpersTestCase(TestCaseInTempDir):
|
||||
"""Some simple tests for individual functions in the provisioning code.
|
||||
"""
|
||||
|
||||
def test_get_ldbs(self):
|
||||
paths = get_paths(param, None, smbConfPath)
|
||||
paths = get_paths(param, None, smb_conf_path)
|
||||
creds = Credentials()
|
||||
lp = env_loadparm()
|
||||
creds.guess(lp)
|
||||
get_ldbs(paths, creds, system_session(), lp)
|
||||
|
||||
def test_find_key_param(self):
|
||||
paths = get_paths(param, None, smbConfPath)
|
||||
paths = get_paths(param, None, smb_conf_path)
|
||||
creds = Credentials()
|
||||
lp = env_loadparm()
|
||||
creds.guess(lp)
|
||||
rootdn = "dc=samba,dc=example,dc=com"
|
||||
ldbs = get_ldbs(paths, creds, system_session(), lp)
|
||||
names = find_provision_key_parameters(ldbs.sam, ldbs.secrets, ldbs.idmap,
|
||||
paths, smbConfPath, lp)
|
||||
paths, smb_conf_path, lp)
|
||||
self.assertEquals(names.realm, "SAMBA.EXAMPLE.COM")
|
||||
self.assertTrue(str(names.rootdn).lower() == rootdn.lower())
|
||||
self.assertEquals(str(names.rootdn).lower(), rootdn.lower())
|
||||
self.assertTrue(names.policyid_dc != None)
|
||||
self.assertTrue(names.ntdsguid != "")
|
||||
|
||||
|
||||
class UpgradeProvisionWithLdbTestCase(TestCaseInTempDir):
|
||||
|
||||
def _getEmptyDbName(self):
|
||||
return os.path.join(self.tempdir, "sam.ldb")
|
||||
|
||||
def setUp(self):
|
||||
super(UpgradeProvisionWithLdbTestCase, self).setUp()
|
||||
paths = get_paths(param, None, smbConfPath)
|
||||
paths = get_paths(param, None, smb_conf_path)
|
||||
self.creds = Credentials()
|
||||
self.creds.guess(lp)
|
||||
self.lp = env_loadparm()
|
||||
self.creds.guess(self.lp)
|
||||
self.paths = paths
|
||||
self.ldbs = get_ldbs(paths, self.creds, system_session(), lp)
|
||||
self.lp = lp
|
||||
self.ldbs = get_ldbs(paths, self.creds, system_session(), self.lp)
|
||||
self.names = find_provision_key_parameters(self.ldbs.sam, self.ldbs.secrets,
|
||||
self.ldbs.idmap, paths, smbConfPath, lp)
|
||||
self.ldbs.idmap, paths, smb_conf_path, self.lp)
|
||||
self.referencedb = create_dummy_secretsdb(
|
||||
os.path.join(self.tempdir, "ref.ldb"))
|
||||
|
||||
|
||||
def test_search_constructed_attrs_stored(self):
|
||||
hashAtt = search_constructed_attrs_stored(self.ldbs.sam,
|
||||
self.names.rootdn,
|
||||
["msds-KeyVersionNumber"])
|
||||
self.assertFalse(hashAtt.has_key("msds-KeyVersionNumber"))
|
||||
|
||||
def test_identic_rename(self):
|
||||
rootdn = "DC=samba,DC=example,DC=com"
|
||||
|
||||
|
@ -166,6 +166,7 @@ def get_ldbs(paths, creds, session, lp):
|
||||
|
||||
return ldbs
|
||||
|
||||
|
||||
def usn_in_range(usn, range):
|
||||
"""Check if the usn is in one of the range provided.
|
||||
To do so, the value is checked to be between the lower bound and
|
||||
@ -174,25 +175,27 @@ def usn_in_range(usn, range):
|
||||
:param usn: A integer value corresponding to the usn that we want to update
|
||||
:param range: A list of integer representing ranges, lower bounds are in
|
||||
the even indices, higher in odd indices
|
||||
:return: 1 if the usn is in one of the range, 0 otherwise"""
|
||||
:return: True if the usn is in one of the range, False otherwise
|
||||
"""
|
||||
|
||||
idx = 0
|
||||
cont = 1
|
||||
ok = 0
|
||||
while (cont == 1):
|
||||
cont = True
|
||||
ok = False
|
||||
while cont:
|
||||
if idx == len(range):
|
||||
cont = 0
|
||||
cont = False
|
||||
continue
|
||||
if usn < int(range[idx]):
|
||||
if idx %2 == 1:
|
||||
ok = 1
|
||||
cont = 0
|
||||
ok = True
|
||||
cont = False
|
||||
if usn == int(range[idx]):
|
||||
cont = 0
|
||||
ok = 1
|
||||
cont = False
|
||||
ok = True
|
||||
idx = idx + 1
|
||||
return ok
|
||||
|
||||
|
||||
def get_paths(param, targetdir=None, smbconf=None):
|
||||
"""Get paths to important provision objects (smb.conf, ldb files, ...)
|
||||
|
||||
@ -237,6 +240,7 @@ def update_policyids(names, samdb):
|
||||
else:
|
||||
names.policyid_dc = None
|
||||
|
||||
|
||||
def find_provision_key_parameters(samdb, secretsdb, idmapdb, paths, smbconf, lp):
|
||||
"""Get key provision parameters (realm, domain, ...) from a given provision
|
||||
|
||||
@ -246,8 +250,8 @@ def find_provision_key_parameters(samdb, secretsdb, idmapdb, paths, smbconf, lp)
|
||||
:param paths: A list of path to provision object
|
||||
:param smbconf: Path to the smb.conf file
|
||||
:param lp: A LoadParm object
|
||||
:return: A list of key provision parameters"""
|
||||
|
||||
:return: A list of key provision parameters
|
||||
"""
|
||||
names = ProvisionNames()
|
||||
names.adminpass = None
|
||||
|
||||
@ -408,16 +412,19 @@ def dn_sort(x, y):
|
||||
return -1
|
||||
return ret
|
||||
|
||||
|
||||
def identic_rename(ldbobj, dn):
|
||||
"""Perform a back and forth rename to trigger renaming on attribute that
|
||||
can't be directly modified.
|
||||
can't be directly modified.
|
||||
|
||||
:param lbdobj: An Ldb Object
|
||||
:param dn: DN of the object to manipulate """
|
||||
:param dn: DN of the object to manipulate
|
||||
"""
|
||||
(before, sep, after)=str(dn).partition('=')
|
||||
ldbobj.rename(dn, ldb.Dn(ldbobj, "%s=foo%s" % (before, after)))
|
||||
ldbobj.rename(ldb.Dn(ldbobj, "%s=foo%s" % (before, after)), dn)
|
||||
|
||||
|
||||
def chunck_acl(acl):
|
||||
"""Return separate ACE of an ACL
|
||||
|
||||
@ -659,7 +666,7 @@ def update_gpo(paths, samdb, names, lp, message, force=0):
|
||||
Set ACL correctly also.
|
||||
Check ACLs for sysvol/netlogon dirs also
|
||||
"""
|
||||
resetacls = 0
|
||||
resetacls = False
|
||||
try:
|
||||
ntacls.checkset_backend(lp, None, None)
|
||||
eadbname = lp.get("posix:eadb")
|
||||
@ -674,10 +681,10 @@ def update_gpo(paths, samdb, names, lp, message, force=0):
|
||||
attribute = samba.xattr_native.wrap_getxattr(paths.sysvol,
|
||||
xattr.XATTR_NTACL_NAME)
|
||||
except:
|
||||
resetacls = 1
|
||||
resetacls = True
|
||||
|
||||
if force:
|
||||
resetacls = 1
|
||||
resetacls = True
|
||||
|
||||
dir = getpolicypath(paths.sysvol, names.dnsdomain, names.policyid)
|
||||
if not os.path.isdir(dir):
|
||||
|
Reference in New Issue
Block a user