1
0
mirror of https://github.com/samba-team/samba.git synced 2025-07-31 20:22:15 +03:00

Fix PEP8 warning E501 line too long

Mostly involves splitting up long strings or comments so that they
span multiple lines. Some place-holder variables have been added in a
few places to avoid exceeding 80 chars.

Signed-off-by: Tim Beale <timbeale@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Reviewed-by: Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
This commit is contained in:
Tim Beale
2018-07-27 14:34:16 +12:00
committed by Andrew Bartlett
parent 7065f5299f
commit 68f8a1c274
6 changed files with 311 additions and 199 deletions

View File

@ -19,7 +19,8 @@ import samba.getopt as options
import ldb import ldb
from samba.samdb import SamDB from samba.samdb import SamDB
from samba.netcmd import (Command, CommandError, Option, SuperCommand) from samba.netcmd import (Command, CommandError, Option, SuperCommand)
from samba.dcerpc.samr import DOMAIN_PASSWORD_COMPLEX, DOMAIN_PASSWORD_STORE_CLEARTEXT from samba.dcerpc.samr import (DOMAIN_PASSWORD_COMPLEX,
DOMAIN_PASSWORD_STORE_CLEARTEXT)
from samba.auth import system_session from samba.auth import system_session
NEVER_TIMESTAMP = int(-0x8000000000000000) NEVER_TIMESTAMP = int(-0x8000000000000000)
@ -136,8 +137,8 @@ def show_pso_for_user(outf, samdb, username):
if len(res) == 0: if len(res) == 0:
outf.write("User '%s' not found.\n" % username) outf.write("User '%s' not found.\n" % username)
elif 'msDS-ResultantPSO' not in res[0]: elif 'msDS-ResultantPSO' not in res[0]:
outf.write("No PSO applies to user '%s'. The default domain settings apply.\n" outf.write("No PSO applies to user '%s'. "
% username) "The default domain settings apply.\n" % username)
outf.write("Refer to 'samba-tool domain passwordsettings show'.\n") outf.write("Refer to 'samba-tool domain passwordsettings show'.\n")
else: else:
# sanity-check user has permissions to view PSO details (non-admin # sanity-check user has permissions to view PSO details (non-admin
@ -234,15 +235,18 @@ def check_pso_constraints(min_pwd_length=None, history_length=None,
# check values as per section 3.1.1.5.2.2 Constraints in MS-ADTS spec # check values as per section 3.1.1.5.2.2 Constraints in MS-ADTS spec
if history_length is not None and history_length > 1024: if history_length is not None and history_length > 1024:
raise CommandError("Bad password history length: valid range is 0 to 1024") raise CommandError("Bad password history length: "
"valid range is 0 to 1024")
if min_pwd_length is not None and min_pwd_length > 255: if min_pwd_length is not None and min_pwd_length > 255:
raise CommandError("Bad minimum password length: valid range is 0 to 255") raise CommandError("Bad minimum password length: "
"valid range is 0 to 255")
if min_pwd_age is not None and max_pwd_age is not None: if min_pwd_age is not None and max_pwd_age is not None:
# note max-age=zero is a special case meaning 'never expire' # note max-age=zero is a special case meaning 'never expire'
if min_pwd_age >= max_pwd_age and max_pwd_age != 0: if min_pwd_age >= max_pwd_age and max_pwd_age != 0:
raise CommandError("Minimum password age must be less than the maximum age") raise CommandError("Minimum password age must be less than "
"maximum age")
# the same args are used for both create and set commands # the same args are used for both create and set commands
@ -250,21 +254,29 @@ pwd_settings_options = [
Option("--complexity", type="choice", choices=["on", "off"], Option("--complexity", type="choice", choices=["on", "off"],
help="The password complexity (on | off)."), help="The password complexity (on | off)."),
Option("--store-plaintext", type="choice", choices=["on", "off"], Option("--store-plaintext", type="choice", choices=["on", "off"],
help="Store plaintext passwords where account have 'store passwords with reversible encryption' set (on | off)."), help="Store plaintext passwords where account have "
"'store passwords with reversible encryption' set (on | off)."),
Option("--history-length", Option("--history-length",
help="The password history length (<integer>).", type=int), help="The password history length (<integer>).", type=int),
Option("--min-pwd-length", Option("--min-pwd-length",
help="The minimum password length (<integer>).", type=int), help="The minimum password length (<integer>).", type=int),
Option("--min-pwd-age", Option("--min-pwd-age",
help="The minimum password age (<integer in days>). Default is domain setting.", type=int), help=("The minimum password age (<integer in days>). "
"Default is domain setting."), type=int),
Option("--max-pwd-age", Option("--max-pwd-age",
help="The maximum password age (<integer in days>). Default is domain setting.", type=int), help=("The maximum password age (<integer in days>). "
Option("--account-lockout-duration", "Default is domain setting."), type=int),
help="The the length of time an account is locked out after exeeding the limit on bad password attempts (<integer in mins>). Default is domain setting", type=int), Option("--account-lockout-duration", type=int,
Option("--account-lockout-threshold", help=("The length of time an account is locked out after exceeding "
help="The number of bad password attempts allowed before locking out the account (<integer>). Default is domain setting.", type=int), "the limit on bad password attempts (<integer in mins>). "
"Default is domain setting")),
Option("--account-lockout-threshold", type=int,
help=("The number of bad password attempts allowed before locking "
"out the account (<integer>). Default is domain setting.")),
Option("--reset-account-lockout-after", Option("--reset-account-lockout-after",
help="After this time is elapsed, the recorded number of attempts restarts from zero (<integer in mins>). Default is domain setting.", type=int)] help=("After this time is elapsed, the recorded number of attempts "
"restarts from zero (<integer in mins>). "
"Default is domain setting."), type=int)]
def num_options_in_args(options, args): def num_options_in_args(options, args):
@ -309,8 +321,8 @@ class cmd_domain_pwdsettings_pso_create(Command):
} }
takes_options = pwd_settings_options + [ takes_options = pwd_settings_options + [
Option("-H", "--URL", help="LDB URL for database or target server", type=str, Option("-H", "--URL", help="LDB URL for database or target server",
metavar="URL", dest="H") metavar="URL", dest="H", type=str)
] ]
takes_args = ["psoname", "precedence"] takes_args = ["psoname", "precedence"]
@ -329,7 +341,8 @@ class cmd_domain_pwdsettings_pso_create(Command):
try: try:
precedence = int(precedence) precedence = int(precedence)
except ValueError: except ValueError:
raise CommandError("The PSO's precedence should be a numerical value. Try --help") raise CommandError("The PSO's precedence should be "
"a numerical value. Try --help")
# sanity-check that the PSO doesn't already exist # sanity-check that the PSO doesn't already exist
pso_dn = "CN=%s,%s" % (psoname, pso_container(samdb)) pso_dn = "CN=%s,%s" % (psoname, pso_container(samdb))
@ -347,14 +360,17 @@ class cmd_domain_pwdsettings_pso_create(Command):
# otherwise there's no point in creating a PSO # otherwise there's no point in creating a PSO
num_pwd_args = num_options_in_args(pwd_settings_options, self.raw_argv) num_pwd_args = num_options_in_args(pwd_settings_options, self.raw_argv)
if num_pwd_args == 0: if num_pwd_args == 0:
raise CommandError("Please specify at least one password policy setting. Try --help") raise CommandError("Please specify at least one password policy "
"setting. Try --help")
# it's unlikely that the user will specify all 9 password policy # it's unlikely that the user will specify all 9 password policy
# settings on the CLI - current domain password-settings as the default # settings on the CLI - current domain password-settings as the default
# values for unspecified arguments # values for unspecified arguments
if num_pwd_args < len(pwd_settings_options): if num_pwd_args < len(pwd_settings_options):
self.message("Not all password policy options have been specified.") self.message("Not all password policy options "
self.message("For unspecified options, the current domain password settings will be used as the default values.") "have been specified.")
self.message("For unspecified options, the current domain password"
" settings will be used as the default values.")
# lookup the current domain password-settings # lookup the current domain password-settings
res = samdb.search(samdb.domain_dn(), scope=ldb.SCOPE_BASE, res = samdb.search(samdb.domain_dn(), scope=ldb.SCOPE_BASE,
@ -420,7 +436,8 @@ class cmd_domain_pwdsettings_pso_create(Command):
except ldb.LdbError as e: except ldb.LdbError as e:
(num, msg) = e.args (num, msg) = e.args
if num == ldb.ERR_INSUFFICIENT_ACCESS_RIGHTS: if num == ldb.ERR_INSUFFICIENT_ACCESS_RIGHTS:
raise CommandError("Administrator permissions are needed to create a PSO.") raise CommandError("Administrator permissions are needed "
"to create a PSO.")
else: else:
raise CommandError("Failed to create PSO '%s': %s" % (pso_dn, raise CommandError("Failed to create PSO '%s': %s" % (pso_dn,
msg)) msg))
@ -439,7 +456,8 @@ class cmd_domain_pwdsettings_pso_set(Command):
takes_options = pwd_settings_options + [ takes_options = pwd_settings_options + [
Option("--precedence", type=int, Option("--precedence", type=int,
help="This PSO's precedence relative to other PSOs. Lower precedence is better (<integer>)."), help=("This PSO's precedence relative to other PSOs. "
"Lower precedence is better (<integer>).")),
Option("-H", "--URL", help="LDB URL for database or target server", Option("-H", "--URL", help="LDB URL for database or target server",
type=str, metavar="URL", dest="H"), type=str, metavar="URL", dest="H"),
] ]
@ -464,19 +482,23 @@ class cmd_domain_pwdsettings_pso_set(Command):
# we expect the user to specify at least one password-policy setting # we expect the user to specify at least one password-policy setting
num_pwd_args = num_options_in_args(pwd_settings_options, self.raw_argv) num_pwd_args = num_options_in_args(pwd_settings_options, self.raw_argv)
if num_pwd_args == 0 and precedence is None: if num_pwd_args == 0 and precedence is None:
raise CommandError("Please specify at least one password policy setting. Try --help") raise CommandError("Please specify at least one password policy "
"setting. Try --help")
if min_pwd_age is not None or max_pwd_age is not None: if min_pwd_age is not None or max_pwd_age is not None:
# if we're modifying either the max or min pwd-age, check the max is # if we're modifying either the max or min pwd-age, check the max
# always larger. We may have to fetch the PSO's setting to verify this # is always larger. We may have to fetch the PSO's setting to
# verify this
res = samdb.search(pso_dn, scope=ldb.SCOPE_BASE, res = samdb.search(pso_dn, scope=ldb.SCOPE_BASE,
attrs=['msDS-MinimumPasswordAge', attrs=['msDS-MinimumPasswordAge',
'msDS-MaximumPasswordAge']) 'msDS-MaximumPasswordAge'])
if min_pwd_age is None: if min_pwd_age is None:
min_pwd_age = timestamp_to_days(res[0]['msDS-MinimumPasswordAge'][0]) min_pwd_ticks = res[0]['msDS-MinimumPasswordAge'][0]
min_pwd_age = timestamp_to_days(min_pwd_ticks)
if max_pwd_age is None: if max_pwd_age is None:
max_pwd_age = timestamp_to_days(res[0]['msDS-MaximumPasswordAge'][0]) max_pwd_ticks = res[0]['msDS-MaximumPasswordAge'][0]
max_pwd_age = timestamp_to_days(max_pwd_ticks)
check_pso_constraints(max_pwd_age=max_pwd_age, min_pwd_age=min_pwd_age, check_pso_constraints(max_pwd_age=max_pwd_age, min_pwd_age=min_pwd_age,
history_length=history_length, history_length=history_length,
@ -516,8 +538,8 @@ class cmd_domain_pwdsettings_pso_delete(Command):
} }
takes_options = [ takes_options = [
Option("-H", "--URL", help="LDB URL for database or target server", type=str, Option("-H", "--URL", help="LDB URL for database or target server",
metavar="URL", dest="H") metavar="URL", dest="H", type=str)
] ]
takes_args = ["psoname"] takes_args = ["psoname"]
@ -556,8 +578,8 @@ class cmd_domain_pwdsettings_pso_list(Command):
} }
takes_options = [ takes_options = [
Option("-H", "--URL", help="LDB URL for database or target server", type=str, Option("-H", "--URL", help="LDB URL for database or target server",
metavar="URL", dest="H") metavar="URL", dest="H", type=str)
] ]
def run(self, H=None, credopts=None, sambaopts=None, versionopts=None): def run(self, H=None, credopts=None, sambaopts=None, versionopts=None):
@ -574,7 +596,8 @@ class cmd_domain_pwdsettings_pso_list(Command):
# an unprivileged search against Windows returns nothing here. On Samba # an unprivileged search against Windows returns nothing here. On Samba
# we get the PSO names, but not their attributes # we get the PSO names, but not their attributes
if len(res) == 0 or 'msDS-PasswordSettingsPrecedence' not in res[0]: if len(res) == 0 or 'msDS-PasswordSettingsPrecedence' not in res[0]:
self.outf.write("No PSOs are present, or you don't have permission to view them.\n") self.outf.write("No PSOs are present, or you don't have permission"
" to view them.\n")
return return
# sort the PSOs so they're displayed in order of precedence # sort the PSOs so they're displayed in order of precedence
@ -600,8 +623,8 @@ class cmd_domain_pwdsettings_pso_show(Command):
} }
takes_options = [ takes_options = [
Option("-H", "--URL", help="LDB URL for database or target server", type=str, Option("-H", "--URL", help="LDB URL for database or target server",
metavar="URL", dest="H") metavar="URL", dest="H", type=str)
] ]
takes_args = ["psoname"] takes_args = ["psoname"]
@ -630,8 +653,8 @@ class cmd_domain_pwdsettings_pso_show_user(Command):
} }
takes_options = [ takes_options = [
Option("-H", "--URL", help="LDB URL for database or target server", type=str, Option("-H", "--URL", help="LDB URL for database or target server",
metavar="URL", dest="H") metavar="URL", dest="H", type=str)
] ]
takes_args = ["username"] takes_args = ["username"]
@ -666,8 +689,8 @@ class cmd_domain_pwdsettings_pso_apply(Command):
} }
takes_options = [ takes_options = [
Option("-H", "--URL", help="LDB URL for database or target server", type=str, Option("-H", "--URL", help="LDB URL for database or target server",
metavar="URL", dest="H") metavar="URL", dest="H", type=str)
] ]
takes_args = ["psoname", "user_or_group"] takes_args = ["psoname", "user_or_group"]
@ -696,7 +719,8 @@ class cmd_domain_pwdsettings_pso_apply(Command):
target_dn = str(res[0].dn) target_dn = str(res[0].dn)
m = ldb.Message() m = ldb.Message()
m.dn = ldb.Dn(samdb, pso_dn) m.dn = ldb.Dn(samdb, pso_dn)
m["msDS-PSOAppliesTo"] = ldb.MessageElement(target_dn, ldb.FLAG_MOD_ADD, m["msDS-PSOAppliesTo"] = ldb.MessageElement(target_dn,
ldb.FLAG_MOD_ADD,
"msDS-PSOAppliesTo") "msDS-PSOAppliesTo")
try: try:
samdb.modify(m) samdb.modify(m)
@ -725,8 +749,8 @@ class cmd_domain_pwdsettings_pso_unapply(Command):
} }
takes_options = [ takes_options = [
Option("-H", "--URL", help="LDB URL for database or target server", type=str, Option("-H", "--URL", help="LDB URL for database or target server",
metavar="URL", dest="H"), metavar="URL", dest="H", type=str),
] ]
takes_args = ["psoname", "user_or_group"] takes_args = ["psoname", "user_or_group"]
@ -755,7 +779,8 @@ class cmd_domain_pwdsettings_pso_unapply(Command):
target_dn = str(res[0].dn) target_dn = str(res[0].dn)
m = ldb.Message() m = ldb.Message()
m.dn = ldb.Dn(samdb, pso_dn) m.dn = ldb.Dn(samdb, pso_dn)
m["msDS-PSOAppliesTo"] = ldb.MessageElement(target_dn, ldb.FLAG_MOD_DELETE, m["msDS-PSOAppliesTo"] = ldb.MessageElement(target_dn,
ldb.FLAG_MOD_DELETE,
"msDS-PSOAppliesTo") "msDS-PSOAppliesTo")
try: try:
samdb.modify(m) samdb.modify(m)

View File

@ -56,8 +56,9 @@ class TestUser:
if hist_len == 0: if hist_len == 0:
return self.all_old_passwords[:] return self.all_old_passwords[:]
# just exclude our pwd_history if there's not much in it. This can happen # just exclude our pwd_history if there's not much in it. This can
# if we've been using a lower PasswordHistoryLength setting previously # happen if we've been using a lower PasswordHistoryLength setting
# previously
hist_len = min(len(self.pwd_history), hist_len) hist_len = min(len(self.pwd_history), hist_len)
# return any passwords up to the nth-from-last item # return any passwords up to the nth-from-last item
@ -67,8 +68,9 @@ class TestUser:
"""Updates the user's password history to reflect a password change""" """Updates the user's password history to reflect a password change"""
# we maintain 2 lists: all passwords the user has ever had, and an # we maintain 2 lists: all passwords the user has ever had, and an
# effective password-history that should roughly mirror the DC. # effective password-history that should roughly mirror the DC.
# pwd_history_change() handles the corner-case where we need to truncate # pwd_history_change() handles the corner-case where we need to
# password-history due to PasswordHistoryLength settings changes # truncate password-history due to PasswordHistoryLength settings
# changes
if new_password in self.all_old_passwords: if new_password in self.all_old_passwords:
self.all_old_passwords.remove(new_password) self.all_old_passwords.remove(new_password)
self.all_old_passwords.append(new_password) self.all_old_passwords.append(new_password)
@ -102,15 +104,16 @@ add: userPassword
userPassword: %s userPassword: %s
""" % (self.dn, self.get_password(), new_password) """ % (self.dn, self.get_password(), new_password)
# this modify will throw an exception if new_password doesn't meet the # this modify will throw an exception if new_password doesn't meet the
# PSO constraints (which the test code catches if it's expected to fail) # PSO constraints (which the test code catches if it's expected to
# fail)
self.ldb.modify_ldif(ldif) self.ldb.modify_ldif(ldif)
self.update_pwd_history(new_password) self.update_pwd_history(new_password)
def pwd_history_change(self, old_hist_len, new_hist_len): def pwd_history_change(self, old_hist_len, new_hist_len):
""" """
Updates what in the password history will take effect, to reflect changes Updates the effective password history, to reflect changes on the DC.
on the DC. When the PasswordHistoryLength applied to a user changes from When the PasswordHistoryLength applied to a user changes from a low
a low setting (e.g. 2) to a higher setting (e.g. 4), passwords #3 and #4 setting (e.g. 2) to a higher setting (e.g. 4), passwords #3 and #4
won't actually have been stored on the DC, so we need to make sure they won't actually have been stored on the DC, so we need to make sure they
are removed them from our mirror pwd_history list. are removed them from our mirror pwd_history list.
""" """
@ -267,4 +270,3 @@ msDS-PasswordSettingsPrecedence: %u
""" % (self.dn, new_precedence) """ % (self.dn, new_precedence)
samdb.modify_ldif(ldif) samdb.modify_ldif(ldif)
self.precedence = new_precedence self.precedence = new_precedence

View File

@ -31,8 +31,8 @@ class PwdSettingsCmdTestCase(SambaToolCmdTest):
self.user_auth = "-U%s%%%s" % (os.environ["DC_USERNAME"], self.user_auth = "-U%s%%%s" % (os.environ["DC_USERNAME"],
os.environ["DC_PASSWORD"]) os.environ["DC_PASSWORD"])
self.ldb = self.getSamDB("-H", self.server, self.user_auth) self.ldb = self.getSamDB("-H", self.server, self.user_auth)
self.pso_container = \ system_dn = "CN=System,%s" % self.ldb.domain_dn()
"CN=Password Settings Container,CN=System,%s" % self.ldb.domain_dn() self.pso_container = "CN=Password Settings Container,%s" % system_dn
self.obj_cleanup = [] self.obj_cleanup = []
def tearDown(self): def tearDown(self):
@ -48,9 +48,12 @@ class PwdSettingsCmdTestCase(SambaToolCmdTest):
dn = "CN=%s,%s" % (pso_name, self.pso_container) dn = "CN=%s,%s" % (pso_name, self.pso_container)
pso_attrs = ['name', 'msDS-PasswordSettingsPrecedence', pso_attrs = ['name', 'msDS-PasswordSettingsPrecedence',
'msDS-PasswordReversibleEncryptionEnabled', 'msDS-PasswordReversibleEncryptionEnabled',
'msDS-PasswordHistoryLength', 'msDS-MinimumPasswordLength', 'msDS-PasswordHistoryLength',
'msDS-PasswordComplexityEnabled', 'msDS-MinimumPasswordAge', 'msDS-MinimumPasswordLength',
'msDS-MaximumPasswordAge', 'msDS-LockoutObservationWindow', 'msDS-PasswordComplexityEnabled',
'msDS-MinimumPasswordAge',
'msDS-MaximumPasswordAge',
'msDS-LockoutObservationWindow',
'msDS-LockoutThreshold', 'msDS-LockoutDuration'] 'msDS-LockoutThreshold', 'msDS-LockoutDuration']
res = self.ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=pso_attrs) res = self.ldb.search(dn, scope=ldb.SCOPE_BASE, attrs=pso_attrs)
self.assertEquals(len(res), 1, "PSO lookup failed") self.assertEquals(len(res), 1, "PSO lookup failed")
@ -67,8 +70,8 @@ class PwdSettingsCmdTestCase(SambaToolCmdTest):
# check the PSO's settings match the search results # check the PSO's settings match the search results
self.assertEquals(str(res[0]['msDS-PasswordComplexityEnabled'][0]), self.assertEquals(str(res[0]['msDS-PasswordComplexityEnabled'][0]),
complexity_str) complexity_str)
self.assertEquals(str(res[0]['msDS-PasswordReversibleEncryptionEnabled'][0]), plaintext_res = res[0]['msDS-PasswordReversibleEncryptionEnabled'][0]
plaintext_str) self.assertEquals(str(plaintext_res), plaintext_str)
self.assertEquals(int(res[0]['msDS-PasswordHistoryLength'][0]), self.assertEquals(int(res[0]['msDS-PasswordHistoryLength'][0]),
pso.history_len) pso.history_len)
self.assertEquals(int(res[0]['msDS-MinimumPasswordLength'][0]), self.assertEquals(int(res[0]['msDS-MinimumPasswordLength'][0]),
@ -89,13 +92,14 @@ class PwdSettingsCmdTestCase(SambaToolCmdTest):
"pso", "show"), pso_name, "pso", "show"), pso_name,
"-H", self.server, "-H", self.server,
self.user_auth) self.user_auth)
self.assertTrue(len(out.split(":")) >= 10, "Expect 10 fields displayed") self.assertTrue(len(out.split(":")) >= 10,
"Expect 10 fields displayed")
# for a few settings, sanity-check the display is what we expect # for a few settings, sanity-check the display is what we expect
self.assertIn("Minimum password length: %u" % pso.password_len, out) self.assertIn("Minimum password length: %u" % pso.password_len, out)
self.assertIn("Password history length: %u" % pso.history_len, out) self.assertIn("Password history length: %u" % pso.history_len, out)
self.assertIn("lockout threshold (attempts): %u" % pso.lockout_attempts, lockout_str = "lockout threshold (attempts): %u" % pso.lockout_attempts
out) self.assertIn(lockout_str, out)
def test_pso_create(self): def test_pso_create(self):
"""Tests basic PSO creation using the samba-tool""" """Tests basic PSO creation using the samba-tool"""
@ -207,14 +211,14 @@ class PwdSettingsCmdTestCase(SambaToolCmdTest):
pso_settings.precedence = 99 pso_settings.precedence = 99
pso_settings.lockout_attempts = 10 pso_settings.lockout_attempts = 10
pso_settings.lockout_duration = 60 * 17 pso_settings.lockout_duration = 60 * 17
(result, out, err) = self.runsublevelcmd("domain", ("passwordsettings", (res, out, err) = self.runsublevelcmd("domain", ("passwordsettings",
"pso", "set"), pso_name, "pso", "set"), pso_name,
"--precedence=99", "--precedence=99",
"--account-lockout-threshold=10", "--account-lockout-threshold=10",
"--account-lockout-duration=17", "--account-lockout-duration=17",
"-H", self.server, "-H", self.server,
self.user_auth) self.user_auth)
self.assertCmdSuccess(result, out, err) self.assertCmdSuccess(res, out, err)
self.assertEquals(err, "", "Shouldn't be any error messages") self.assertEquals(err, "", "Shouldn't be any error messages")
self.assertIn("Successfully updated", out) self.assertIn("Successfully updated", out)
@ -259,8 +263,8 @@ class PwdSettingsCmdTestCase(SambaToolCmdTest):
# first check the samba-tool output tells us the correct PSO is applied # first check the samba-tool output tells us the correct PSO is applied
(result, out, err) = self.runsublevelcmd("domain", ("passwordsettings", (result, out, err) = self.runsublevelcmd("domain", ("passwordsettings",
"pso", "show-user"), user.name, "pso", "show-user"),
"-H", self.server, user.name, "-H", self.server,
self.user_auth) self.user_auth)
self.assertCmdSuccess(result, out, err) self.assertCmdSuccess(result, out, err)
self.assertEquals(err, "", "Shouldn't be any error messages") self.assertEquals(err, "", "Shouldn't be any error messages")
@ -363,19 +367,22 @@ class PwdSettingsCmdTestCase(SambaToolCmdTest):
(result, out, err) = self.runsublevelcmd("domain", ("passwordsettings", (result, out, err) = self.runsublevelcmd("domain", ("passwordsettings",
"pso", "create"), "bad-perm", "pso", "create"), "bad-perm",
"250", "--complexity=off", "250", "--complexity=off",
"-H", self.server, unpriv_auth) "-H", self.server,
unpriv_auth)
self.assertCmdFail(result, "Need admin privileges to modify PSO") self.assertCmdFail(result, "Need admin privileges to modify PSO")
self.assertIn("Administrator permissions are needed", err) self.assertIn("Administrator permissions are needed", err)
(result, out, err) = self.runsublevelcmd("domain", ("passwordsettings", (result, out, err) = self.runsublevelcmd("domain", ("passwordsettings",
"pso", "delete"), pso_name, "pso", "delete"), pso_name,
"-H", self.server, unpriv_auth) "-H", self.server,
unpriv_auth)
self.assertCmdFail(result, "Need admin privileges to delete PSO") self.assertCmdFail(result, "Need admin privileges to delete PSO")
self.assertIn("You may not have permission", err) self.assertIn("You may not have permission", err)
(result, out, err) = self.runsublevelcmd("domain", ("passwordsettings", (result, out, err) = self.runsublevelcmd("domain", ("passwordsettings",
"pso", "show"), pso_name, "pso", "show"), pso_name,
"-H", self.server, unpriv_auth) "-H", self.server,
unpriv_auth)
self.assertCmdFail(result, "Need admin privileges to view PSO") self.assertCmdFail(result, "Need admin privileges to view PSO")
self.assertIn("You may not have permission", err) self.assertIn("You may not have permission", err)
@ -420,9 +427,9 @@ class PwdSettingsCmdTestCase(SambaToolCmdTest):
# check we can change the domain setting # check we can change the domain setting
self.addCleanup(self.ldb.set_minPwdLength, min_pwd_len) self.addCleanup(self.ldb.set_minPwdLength, min_pwd_len)
new_len = int(min_pwd_len) + 3 new_len = int(min_pwd_len) + 3
min_pwd_args = "--min-pwd-length=%u" % new_len
(result, out, err) = self.runsublevelcmd("domain", ("passwordsettings", (result, out, err) = self.runsublevelcmd("domain", ("passwordsettings",
"set"), "set"), min_pwd_args,
"--min-pwd-length=%u" % new_len,
"-H", self.server, "-H", self.server,
self.user_auth) self.user_auth)
self.assertCmdSuccess(result, out, err) self.assertCmdSuccess(result, out, err)
@ -437,4 +444,3 @@ class PwdSettingsCmdTestCase(SambaToolCmdTest):
self.assertCmdSuccess(result, out, err) self.assertCmdSuccess(result, out, err)
self.assertEquals(err, "", "Shouldn't be any error messages") self.assertEquals(err, "", "Shouldn't be any error messages")
self.assertIn("Minimum password length: %u" % new_len, out) self.assertIn("Minimum password length: %u" % new_len, out)

View File

@ -26,7 +26,8 @@
# Usage: # Usage:
# export SERVER_IP=target_dc # export SERVER_IP=target_dc
# export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun # export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
# PYTHONPATH="$PYTHONPATH:$samba4srcdir/dsdb/tests/python" $SUBUNITRUN password_settings -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD" # PYTHONPATH="$PYTHONPATH:$samba4srcdir/dsdb/tests/python" $SUBUNITRUN \
# password_settings -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
# #
import samba.tests import samba.tests
@ -37,6 +38,7 @@ import time
from samba.tests.password_test import PasswordTestCase from samba.tests.password_test import PasswordTestCase
from samba.tests.pso import TestUser from samba.tests.pso import TestUser
from samba.tests.pso import PasswordSettings from samba.tests.pso import PasswordSettings
from samba.tests import env_get_var_value
from samba.credentials import Credentials from samba.credentials import Credentials
from samba import gensec from samba import gensec
import base64 import base64
@ -46,7 +48,7 @@ class PasswordSettingsTestCase(PasswordTestCase):
def setUp(self): def setUp(self):
super(PasswordSettingsTestCase, self).setUp() super(PasswordSettingsTestCase, self).setUp()
self.host_url = "ldap://%s" % samba.tests.env_get_var_value("SERVER_IP") self.host_url = "ldap://%s" % env_get_var_value("SERVER_IP")
self.ldb = samba.tests.connect_samdb(self.host_url) self.ldb = samba.tests.connect_samdb(self.host_url)
# create a temp OU to put this test's users into # create a temp OU to put this test's users into
@ -65,7 +67,7 @@ class PasswordSettingsTestCase(PasswordTestCase):
# remove all objects under the top-level OU # remove all objects under the top-level OU
self.ldb.delete(self.ou, ["tree_delete:1"]) self.ldb.delete(self.ou, ["tree_delete:1"])
# PSOs can't reside within an OU so they need to be cleaned up separately # PSOs can't reside within an OU so they get cleaned up separately
for obj in self.test_objs: for obj in self.test_objs:
self.ldb.delete(obj) self.ldb.delete(obj)
@ -79,7 +81,8 @@ class PasswordSettingsTestCase(PasswordTestCase):
self.ldb.add({"dn": dn, "objectclass": "group"}) self.ldb.add({"dn": dn, "objectclass": "group"})
return dn return dn
def set_attribute(self, dn, attr, value, operation=FLAG_MOD_ADD, samdb=None): def set_attribute(self, dn, attr, value, operation=FLAG_MOD_ADD,
samdb=None):
"""Modifies an attribute for an object""" """Modifies an attribute for an object"""
if samdb is None: if samdb is None:
samdb = self.ldb samdb = self.ldb
@ -114,7 +117,8 @@ class PasswordSettingsTestCase(PasswordTestCase):
except ldb.LdbError as e: except ldb.LdbError as e:
(num, msg) = e.args (num, msg) = e.args
# fail the test (rather than throw an error) # fail the test (rather than throw an error)
self.fail("Password '%s' unexpectedly rejected: %s" % (password, msg)) self.fail("Password '%s' unexpectedly rejected: %s" % (password,
msg))
def assert_PSO_applied(self, user, pso): def assert_PSO_applied(self, user, pso):
""" """
@ -191,7 +195,7 @@ class PasswordSettingsTestCase(PasswordTestCase):
self.assert_password_valid(user, password) self.assert_password_valid(user, password)
def test_pso_basics(self): def test_pso_basics(self):
"""Simple tests that a PSO takes effect when applied to a group or user""" """Simple tests that a PSO takes effect when applied to a group/user"""
# create some PSOs that vary in priority and basic password-len # create some PSOs that vary in priority and basic password-len
best_pso = PasswordSettings("highest-priority-PSO", self.ldb, best_pso = PasswordSettings("highest-priority-PSO", self.ldb,
@ -237,7 +241,8 @@ class PasswordSettingsTestCase(PasswordTestCase):
self.assert_PSO_applied(user, best_pso) self.assert_PSO_applied(user, best_pso)
# delete a group membership and check the PSO changes # delete a group membership and check the PSO changes
self.set_attribute(group3, "member", user.dn, operation=FLAG_MOD_DELETE) self.set_attribute(group3, "member", user.dn,
operation=FLAG_MOD_DELETE)
self.assert_PSO_applied(user, medium_pso) self.assert_PSO_applied(user, medium_pso)
# apply the low-precedence PSO directly to the user # apply the low-precedence PSO directly to the user
@ -311,7 +316,8 @@ class PasswordSettingsTestCase(PasswordTestCase):
self.assert_PSO_applied(user, group2_pso) self.assert_PSO_applied(user, group2_pso)
def get_guid(self, dn): def get_guid(self, dn):
res = self.ldb.search(base=dn, attrs=["objectGUID"], scope=ldb.SCOPE_BASE) res = self.ldb.search(base=dn, attrs=["objectGUID"],
scope=ldb.SCOPE_BASE)
return res[0]['objectGUID'][0] return res[0]['objectGUID'][0]
def guid_string(self, guid): def guid_string(self, guid):
@ -333,7 +339,8 @@ class PasswordSettingsTestCase(PasswordTestCase):
best_guid = guid_list[0] best_guid = guid_list[0]
# sanity-check the mapping between GUID and DN is correct # sanity-check the mapping between GUID and DN is correct
self.assertEqual(self.guid_string(self.get_guid(mapping[best_guid].dn)), best_pso_dn = mapping[best_guid].dn
self.assertEqual(self.guid_string(self.get_guid(best_pso_dn)),
self.guid_string(best_guid)) self.guid_string(best_guid))
# return the PSO that this GUID corresponds to # return the PSO that this GUID corresponds to
@ -457,7 +464,8 @@ class PasswordSettingsTestCase(PasswordTestCase):
pso.apply_to(user.dn) pso.apply_to(user.dn)
self.assertTrue(user.get_resultant_PSO() == pso.dn) self.assertTrue(user.get_resultant_PSO() == pso.dn)
# changing the password immediately should fail, even if password is valid # changing the password immediately should fail, even if the password
# is valid
valid_password = "min-age-passwd" valid_password = "min-age-passwd"
self.assert_password_invalid(user, valid_password) self.assert_password_invalid(user, valid_password)
# then trying the same password later should succeed # then trying the same password later should succeed
@ -481,8 +489,9 @@ class PasswordSettingsTestCase(PasswordTestCase):
user = self.add_user("testuser") user = self.add_user("testuser")
# we can't wait around long enough for the max-age to expire, so instead # we can't wait around long enough for the max-age to expire, so
# just check the msDS-UserPasswordExpiryTimeComputed for the user # instead just check the msDS-UserPasswordExpiryTimeComputed for
# the user
attrs = ['msDS-UserPasswordExpiryTimeComputed'] attrs = ['msDS-UserPasswordExpiryTimeComputed']
res = self.ldb.search(user.dn, attrs=attrs) res = self.ldb.search(user.dn, attrs=attrs)
domain_expiry = int(res[0]['msDS-UserPasswordExpiryTimeComputed'][0]) domain_expiry = int(res[0]['msDS-UserPasswordExpiryTimeComputed'][0])
@ -519,9 +528,10 @@ class PasswordSettingsTestCase(PasswordTestCase):
precedence=2, password_len=10) precedence=2, password_len=10)
self.add_obj_cleanup([default_pso.dn, guest_pso.dn, admin_pso.dn, self.add_obj_cleanup([default_pso.dn, guest_pso.dn, admin_pso.dn,
builtin_pso.dn]) builtin_pso.dn])
domain_users = "CN=Domain Users,CN=Users,%s" % self.ldb.domain_dn() base_dn = self.ldb.domain_dn()
domain_guests = "CN=Domain Guests,CN=Users,%s" % self.ldb.domain_dn() domain_users = "CN=Domain Users,CN=Users,%s" % base_dn
admin_users = "CN=Domain Admins,CN=Users,%s" % self.ldb.domain_dn() domain_guests = "CN=Domain Guests,CN=Users,%s" % base_dn
admin_users = "CN=Domain Admins,CN=Users,%s" % base_dn
# if we apply a PSO to Domain Users (which all users are a member of) # if we apply a PSO to Domain Users (which all users are a member of)
# then that PSO should take effect on a new user # then that PSO should take effect on a new user
@ -532,8 +542,8 @@ class PasswordSettingsTestCase(PasswordTestCase):
# Apply a PSO to a builtin group. 'Domain Users' should be a member of # Apply a PSO to a builtin group. 'Domain Users' should be a member of
# Builtin/Users, but builtin groups should be excluded from the PSO # Builtin/Users, but builtin groups should be excluded from the PSO
# calculation, so this should have no effect # calculation, so this should have no effect
builtin_pso.apply_to("CN=Users,CN=Builtin,%s" % self.ldb.domain_dn()) builtin_pso.apply_to("CN=Users,CN=Builtin,%s" % base_dn)
builtin_pso.apply_to("CN=Administrators,CN=Builtin,%s" % self.ldb.domain_dn()) builtin_pso.apply_to("CN=Administrators,CN=Builtin,%s" % base_dn)
self.assert_PSO_applied(user, default_pso) self.assert_PSO_applied(user, default_pso)
# change the user's primary group to another group (the primaryGroupID # change the user's primary group to another group (the primaryGroupID
@ -798,7 +808,9 @@ unicodePwd:: %s
def set_domain_pwdHistoryLength(self, value): def set_domain_pwdHistoryLength(self, value):
m = ldb.Message() m = ldb.Message()
m.dn = ldb.Dn(self.ldb, self.ldb.domain_dn()) m.dn = ldb.Dn(self.ldb, self.ldb.domain_dn())
m["pwdHistoryLength"] = ldb.MessageElement(value, ldb.FLAG_MOD_REPLACE, "pwdHistoryLength") m["pwdHistoryLength"] = ldb.MessageElement(value,
ldb.FLAG_MOD_REPLACE,
"pwdHistoryLength")
self.ldb.modify(m) self.ldb.modify(m)
def test_domain_pwd_history(self): def test_domain_pwd_history(self):
@ -852,8 +864,3 @@ unicodePwd:: %s
# *next* time the user's password changes. # *next* time the user's password changes.
self.set_domain_pwdHistoryLength("1") self.set_domain_pwdHistoryLength("1")
self.assert_password_invalid(user, "NewPwd12#") self.assert_password_invalid(user, "NewPwd12#")

View File

@ -24,7 +24,8 @@
# export DC1=dc1_dns_name # export DC1=dc1_dns_name
# export DC2=dc2_dns_name # export DC2=dc2_dns_name
# export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun # export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
# PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN getncchanges -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD" # PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN \
# getncchanges -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
# #
from __future__ import print_function from __future__ import print_function
@ -101,7 +102,8 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
def start_new_repl_cycle(self): def start_new_repl_cycle(self):
"""Resets enough state info to start a new replication cycle""" """Resets enough state info to start a new replication cycle"""
# reset rxd_links, but leave rxd_guids and rxd_dn_list alone so we know # reset rxd_links, but leave rxd_guids and rxd_dn_list alone so we know
# whether a parent/target is unknown and needs GET_ANC/GET_TGT to resolve # whether a parent/target is unknown and needs GET_ANC/GET_TGT to
# resolve
self.rxd_links = [] self.rxd_links = []
self.used_get_tgt = False self.used_get_tgt = False
@ -159,13 +161,16 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
# Note that with GET_ANC Windows can end up sending the same parent # Note that with GET_ANC Windows can end up sending the same parent
# object multiple times, so this might be noteworthy but doesn't # object multiple times, so this might be noteworthy but doesn't
# warrant failing the test # warrant failing the test
if (len(received_list) != len(expected_list)): num_received = len(received_list)
print("Note: received %d objects but expected %d" % (len(received_list), num_expected = len(expected_list)
len(expected_list))) if num_received != num_expected:
print("Note: received %d objects but expected %d" % (num_received,
num_expected))
# Check that we received every object that we were expecting # Check that we received every object that we were expecting
for dn in expected_list: for dn in expected_list:
self.assertTrue(dn in received_list, "DN '%s' missing from replication." % dn) self.assertTrue(dn in received_list,
"DN '%s' missing from replication." % dn)
def test_repl_integrity(self): def test_repl_integrity(self):
""" """
@ -176,8 +181,8 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
# The server behaviour differs between samba and Windows. Samba returns # The server behaviour differs between samba and Windows. Samba returns
# the objects in the original order (up to the pre-modify HWM). Windows # the objects in the original order (up to the pre-modify HWM). Windows
# incorporates the modified objects and returns them in the new order # incorporates the modified objects and returns them in the new order
# (i.e. modified objects last), up to the post-modify HWM. The Microsoft # (i.e. modified objects last), up to the post-modify HWM. The
# docs state the Windows behaviour is optional. # Microsoft docs state the Windows behaviour is optional.
# Create a range of objects to replicate. # Create a range of objects to replicate.
expected_dn_list = self.create_object_range(0, 400) expected_dn_list = self.create_object_range(0, 400)
@ -188,11 +193,12 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
# so long as by the end we've received everything # so long as by the end we've received everything
self.repl_get_next() self.repl_get_next()
# Modify some of the second page of objects. This should bump the highwatermark # Modify some of the second page of objects. This should bump the
# highwatermark
for x in range(100, 200): for x in range(100, 200):
self.modify_object(expected_dn_list[x], "displayName", "OU%d" % x) self.modify_object(expected_dn_list[x], "displayName", "OU%d" % x)
(post_modify_hwm, unused) = self._get_highest_hwm_utdv(self.test_ldb_dc) (post_modify_hwm, _) = self._get_highest_hwm_utdv(self.test_ldb_dc)
self.assertTrue(post_modify_hwm.highest_usn > orig_hwm.highest_usn) self.assertTrue(post_modify_hwm.highest_usn > orig_hwm.highest_usn)
# Get the remaining blocks of data # Get the remaining blocks of data
@ -223,7 +229,9 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
return parent_dn == self.ou or parent_dn in known_dn_list return parent_dn == self.ou or parent_dn in known_dn_list
def _repl_send_request(self, get_anc=False, get_tgt=False): def _repl_send_request(self, get_anc=False, get_tgt=False):
"""Sends a GetNCChanges request for the next block of replication data.""" """
Sends a GetNCChanges request for the next block of replication data.
"""
# we're just trying to mimic regular client behaviour here, so just # we're just trying to mimic regular client behaviour here, so just
# use the highwatermark in the last response we received # use the highwatermark in the last response we received
@ -240,7 +248,7 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
more_flags = 0 more_flags = 0
if get_anc: if get_anc:
replica_flags = drsuapi.DRSUAPI_DRS_WRIT_REP | drsuapi.DRSUAPI_DRS_GET_ANC replica_flags |= drsuapi.DRSUAPI_DRS_GET_ANC
self.used_get_anc = True self.used_get_anc = True
if get_tgt: if get_tgt:
@ -252,6 +260,7 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
max_objects=self.max_objects, max_objects=self.max_objects,
highwatermark=highwatermark, highwatermark=highwatermark,
uptodateness_vector=uptodateness_vector, uptodateness_vector=uptodateness_vector,
more_flags=more_flags) more_flags=more_flags)
def repl_get_next(self, get_anc=False, get_tgt=False, assert_links=False): def repl_get_next(self, get_anc=False, get_tgt=False, assert_links=False):
@ -298,11 +307,12 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
return self.repl_get_next(get_anc=True, get_tgt=get_tgt, return self.repl_get_next(get_anc=True, get_tgt=get_tgt,
assert_links=assert_links) assert_links=assert_links)
# check we know about references to any objects in the linked attritbutes # check we know about references to any objects in the linked attrs
received_links = self._get_ctr6_links(ctr6) received_links = self._get_ctr6_links(ctr6)
# This is so that older versions of Samba fail - we want the links to be # This is so that older versions of Samba fail - we want the links to
# sent roughly with the objects, rather than getting all links at the end # be sent roughly with the objects, rather than getting all links at
# the end
if assert_links: if assert_links:
self.assertTrue(len(received_links) > 0, self.assertTrue(len(received_links) > 0,
"Links were expected in the GetNCChanges response") "Links were expected in the GetNCChanges response")
@ -363,8 +373,8 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
def test_repl_integrity_get_anc(self): def test_repl_integrity_get_anc(self):
""" """
Modify the parent objects being replicated while the replication is still Modify the parent objects being replicated while the replication is
in progress (using GET_ANC) and check that no object loss occurs. still in progress (using GET_ANC) and check that no object loss occurs.
""" """
# Note that GET_ANC behaviour varies between Windows and Samba. # Note that GET_ANC behaviour varies between Windows and Samba.
@ -453,13 +463,15 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
# Look up the link attribute in the DB # Look up the link attribute in the DB
# The extended_dn option will dump the GUID info for the link # The extended_dn option will dump the GUID info for the link
# attribute (as a hex blob) # attribute (as a hex blob)
res = self.test_ldb_dc.search(ldb.Dn(self.test_ldb_dc, dn), attrs=[link_attr], res = self.test_ldb_dc.search(ldb.Dn(self.test_ldb_dc, dn),
controls=['extended_dn:1:0'], scope=ldb.SCOPE_BASE) attrs=[link_attr],
controls=['extended_dn:1:0'],
scope=ldb.SCOPE_BASE)
# We didn't find the expected link attribute in the DB for the object. # We didn't find the expected link attribute in the DB for the object.
# Something has gone wrong somewhere... # Something has gone wrong somewhere...
self.assertTrue(link_attr in res[0], "%s in DB doesn't have attribute %s" self.assertTrue(link_attr in res[0],
% (dn, link_attr)) "%s in DB doesn't have attribute %s" % (dn, link_attr))
# find the received link in the list and assert that the target and # find the received link in the list and assert that the target and
# source GUIDs match what's in the DB # source GUIDs match what's in the DB
@ -481,7 +493,8 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
print("Link %s --> %s" % (dn[:25], link.targetDN[:25])) print("Link %s --> %s" % (dn[:25], link.targetDN[:25]))
break break
self.assertTrue(found, "Did not receive expected link for DN %s" % dn) self.assertTrue(found,
"Did not receive expected link for DN %s" % dn)
def test_repl_get_tgt(self): def test_repl_get_tgt(self):
""" """
@ -512,7 +525,8 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
# with GET_TGT # with GET_TGT
while not self.replication_complete(): while not self.replication_complete():
# get the next block of replication data (this sets GET_TGT if needed) # get the next block of replication data (this sets GET_TGT
# if needed)
self.repl_get_next(assert_links=links_expected) self.repl_get_next(assert_links=links_expected)
links_expected = len(self.rxd_links) < len(expected_links) links_expected = len(self.rxd_links) < len(expected_links)
@ -550,10 +564,12 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
self.modify_object(objectsA[i], "managedBy", objectsB[i]) self.modify_object(objectsA[i], "managedBy", objectsB[i])
for i in range(0, 100): for i in range(0, 100):
self.modify_object(objectsB[i], "managedBy", objectsC[(i + 1) % 100]) self.modify_object(objectsB[i], "managedBy",
objectsC[(i + 1) % 100])
for i in range(0, 100): for i in range(0, 100):
self.modify_object(objectsC[i], "managedBy", objectsB[(i + 1) % 100]) self.modify_object(objectsC[i], "managedBy",
objectsB[(i + 1) % 100])
all_objects = objectsA + objectsB + objectsC all_objects = objectsA + objectsB + objectsC
expected_links = all_objects expected_links = all_objects
@ -567,7 +583,8 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
# with GET_TGT # with GET_TGT
while not self.replication_complete(): while not self.replication_complete():
# get the next block of replication data (this sets GET_TGT if needed) # get the next block of replication data (this sets GET_TGT
# if needed)
self.repl_get_next(assert_links=links_expected) self.repl_get_next(assert_links=links_expected)
links_expected = len(self.rxd_links) < len(expected_links) links_expected = len(self.rxd_links) < len(expected_links)
@ -602,8 +619,8 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
self.repl_get_next(get_tgt=True) self.repl_get_next(get_tgt=True)
# create the target objects and add the links. These objects should be # create the target objects and add the links. These objects should be
# outside the scope of the Samba replication cycle, but the links should # outside the scope of the Samba replication cycle, but the links
# still get sent with the source object # should still get sent with the source object
managers = self.create_object_range(0, 100, prefix="manager") managers = self.create_object_range(0, 100, prefix="manager")
for i in range(0, 100): for i in range(0, 100):
@ -643,7 +660,8 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
# Add links from the parents to the children # Add links from the parents to the children
for x in range(0, 100): for x in range(0, 100):
self.modify_object(parent_dn_list[x], "managedBy", expected_dn_list[x + 100]) self.modify_object(parent_dn_list[x], "managedBy",
expected_dn_list[x + 100])
# add some filler objects at the end. This allows us to easily see # add some filler objects at the end. This allows us to easily see
# which chunk the links get sent in # which chunk the links get sent in
@ -706,7 +724,8 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
# with GET_TGT and GET_ANC # with GET_TGT and GET_ANC
while not self.replication_complete(): while not self.replication_complete():
# get the next block of replication data (this sets GET_TGT/GET_ANC) # get the next block of replication data (this sets
# GET_TGT/GET_ANC)
self.repl_get_next(assert_links=links_expected) self.repl_get_next(assert_links=links_expected)
links_expected = len(self.rxd_links) < len(expected_links) links_expected = len(self.rxd_links) < len(expected_links)
@ -842,23 +861,31 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
def restore_deleted_object(self, guid, new_dn): def restore_deleted_object(self, guid, new_dn):
"""Re-animates a deleted object""" """Re-animates a deleted object"""
res = self.test_ldb_dc.search(base="<GUID=%s>" % self._GUID_string(guid), attrs=["isDeleted"], guid_str = self._GUID_string(guid)
controls=['show_deleted:1'], scope=ldb.SCOPE_BASE) res = self.test_ldb_dc.search(base="<GUID=%s>" % guid_str,
attrs=["isDeleted"],
controls=['show_deleted:1'],
scope=ldb.SCOPE_BASE)
if len(res) != 1: if len(res) != 1:
return return
msg = ldb.Message() msg = ldb.Message()
msg.dn = res[0].dn msg.dn = res[0].dn
msg["isDeleted"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "isDeleted") msg["isDeleted"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE,
msg["distinguishedName"] = ldb.MessageElement([new_dn], ldb.FLAG_MOD_REPLACE, "distinguishedName") "isDeleted")
msg["distinguishedName"] = ldb.MessageElement([new_dn],
ldb.FLAG_MOD_REPLACE,
"distinguishedName")
self.test_ldb_dc.modify(msg, ["show_deleted:1"]) self.test_ldb_dc.modify(msg, ["show_deleted:1"])
def sync_DCs(self, nc_dn=None): def sync_DCs(self, nc_dn=None):
# make sure DC1 has all the changes we've made to DC2 # make sure DC1 has all the changes we've made to DC2
self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, nc_dn=nc_dn) self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2,
nc_dn=nc_dn)
def get_object_guid(self, dn): def get_object_guid(self, dn):
res = self.test_ldb_dc.search(base=dn, attrs=["objectGUID"], scope=ldb.SCOPE_BASE) res = self.test_ldb_dc.search(base=dn, attrs=["objectGUID"],
scope=ldb.SCOPE_BASE)
return res[0]['objectGUID'][0] return res[0]['objectGUID'][0]
def set_dc_connection(self, conn): def set_dc_connection(self, conn):
@ -921,7 +948,8 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
la_sources = self.create_object_range(0, 100, prefix="la_src") la_sources = self.create_object_range(0, 100, prefix="la_src")
la_targets = self.create_object_range(0, 100, prefix="la_tgt") la_targets = self.create_object_range(0, 100, prefix="la_tgt")
# store the target object's GUIDs (we need to know these to reanimate them) # store the target object's GUIDs (we need to know these to
# reanimate them)
target_guids = [] target_guids = []
for dn in la_targets: for dn in la_targets:
@ -984,15 +1012,16 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
self.add_object(la_source) self.add_object(la_source)
# create the link target (a server object) in the config NC # create the link target (a server object) in the config NC
sites_dn = "CN=Sites,%s" % self.config_dn
servers_dn = "CN=Servers,CN=Default-First-Site-Name,%s" % sites_dn
rand = random.randint(1, 10000000) rand = random.randint(1, 10000000)
la_target = "CN=getncchanges-%d,CN=Servers,CN=Default-First-Site-Name," \ la_target = "CN=getncchanges-%d,%s" % (rand, servers_dn)
"CN=Sites,%s" % (rand, self.config_dn)
self.add_object(la_target, objectclass="server") self.add_object(la_target, objectclass="server")
# add a cross-partition link between the two # add a cross-partition link between the two
self.modify_object(la_source, "managedBy", la_target) self.modify_object(la_source, "managedBy", la_target)
# First, sync across to the peer the NC containing the link source object # First, sync to the peer the NC containing the link source object
self.sync_DCs() self.sync_DCs()
# Now, before the peer has received the partition containing the target # Now, before the peer has received the partition containing the target
@ -1002,8 +1031,9 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
while not self.replication_complete(): while not self.replication_complete():
# pretend we've received other link targets out of order and that's # pretend we've received other link targets out of order and that's
# forced us to use GET_TGT. This checks the peer doesn't fail trying # forced us to use GET_TGT. This checks the peer doesn't fail
# to fetch a cross-partition target object that doesn't exist # trying to fetch a cross-partition target object that doesn't
# exist
self.repl_get_next(get_tgt=True) self.repl_get_next(get_tgt=True)
self.set_dc_connection(self.default_conn) self.set_dc_connection(self.default_conn)
@ -1029,14 +1059,14 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
attrs=["managedBy"], attrs=["managedBy"],
controls=['extended_dn:1:0'], controls=['extended_dn:1:0'],
scope=ldb.SCOPE_BASE) scope=ldb.SCOPE_BASE)
self.assertFalse("managedBy" in res[0], "%s in DB still has managedBy attribute" self.assertFalse("managedBy" in res[0],
% la_source) "%s in DB still has managedBy attribute" % la_source)
res = self.test_ldb_dc.search(ldb.Dn(self.ldb_dc2, la_source), res = self.test_ldb_dc.search(ldb.Dn(self.ldb_dc2, la_source),
attrs=["managedBy"], attrs=["managedBy"],
controls=['extended_dn:1:0'], controls=['extended_dn:1:0'],
scope=ldb.SCOPE_BASE) scope=ldb.SCOPE_BASE)
self.assertFalse("managedBy" in res[0], "%s in DB still has managedBy attribute" self.assertFalse("managedBy" in res[0],
% la_source) "%s in DB still has managedBy attribute" % la_source)
# Check receiving a cross-partition link to a deleted target. # Check receiving a cross-partition link to a deleted target.
# Delete the target and make sure the deletion is sync'd between DCs # Delete the target and make sure the deletion is sync'd between DCs
@ -1056,8 +1086,8 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
attrs=["managedBy"], attrs=["managedBy"],
controls=['extended_dn:1:0'], controls=['extended_dn:1:0'],
scope=ldb.SCOPE_BASE) scope=ldb.SCOPE_BASE)
self.assertTrue("managedBy" in res[0], "%s in DB missing managedBy attribute" self.assertTrue("managedBy" in res[0],
% la_source) "%s in DB missing managedBy attribute" % la_source)
# cleanup the server object we created in the Configuration partition # cleanup the server object we created in the Configuration partition
self.test_ldb_dc.delete(la_target) self.test_ldb_dc.delete(la_target)
@ -1094,7 +1124,8 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
# check we received all the expected objects/links # check we received all the expected objects/links
self.assert_expected_data(expected_objects) self.assert_expected_data(expected_objects)
self.assert_expected_links([la_source], link_attr="addressBookRoots2", num_expected=500) self.assert_expected_links([la_source], link_attr="addressBookRoots2",
num_expected=500)
# Do the replication again, forcing the use of GET_TGT this time # Do the replication again, forcing the use of GET_TGT this time
self.init_test_state() self.init_test_state()
@ -1113,7 +1144,8 @@ class DrsReplicaSyncIntegrityTestCase(drs_base.DrsBaseTestCase):
# check we received all the expected objects/links # check we received all the expected objects/links
self.assert_expected_data(expected_objects) self.assert_expected_data(expected_objects)
self.assert_expected_links([la_source], link_attr="addressBookRoots2", num_expected=500) self.assert_expected_links([la_source], link_attr="addressBookRoots2",
num_expected=500)
class DcConnection: class DcConnection:
@ -1122,6 +1154,5 @@ class DcConnection:
def __init__(self, drs_base, ldb_dc, dnsname_dc): def __init__(self, drs_base, ldb_dc, dnsname_dc):
self.ldb_dc = ldb_dc self.ldb_dc = ldb_dc
(self.drs, self.drs_handle) = drs_base._ds_bind(dnsname_dc) (self.drs, self.drs_handle) = drs_base._ds_bind(dnsname_dc)
(self.default_hwm, self.default_utdv) = drs_base._get_highest_hwm_utdv(ldb_dc) (self.default_hwm, utdv) = drs_base._get_highest_hwm_utdv(ldb_dc)
self.default_utdv = utdv

View File

@ -25,7 +25,8 @@
# export DC1=dc1_dns_name # export DC1=dc1_dns_name
# export DC2=dc2_dns_name # export DC2=dc2_dns_name
# export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun # export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
# PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN link_conflicts -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD" # PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN \
# link_conflicts -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
# #
import drs_base import drs_base
@ -37,6 +38,7 @@ import time
from drs_base import AbstractLink from drs_base import AbstractLink
from samba.dcerpc import drsuapi, misc from samba.dcerpc import drsuapi, misc
from samba.dcerpc.drsuapi import DRSUAPI_EXOP_ERR_SUCCESS
# specifies the order to sync DCs in # specifies the order to sync DCs in
DC1_TO_DC2 = 1 DC1_TO_DC2 = 1
@ -47,7 +49,8 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase):
def setUp(self): def setUp(self):
super(DrsReplicaLinkConflictTestCase, self).setUp() super(DrsReplicaLinkConflictTestCase, self).setUp()
self.ou = samba.tests.create_test_ou(self.ldb_dc1, "test_link_conflict") self.ou = samba.tests.create_test_ou(self.ldb_dc1,
"test_link_conflict")
self.base_dn = self.ldb_dc1.get_default_basedn() self.base_dn = self.ldb_dc1.get_default_basedn()
(self.drs, self.drs_handle) = self._ds_bind(self.dnsname_dc1) (self.drs, self.drs_handle) = self._ds_bind(self.dnsname_dc1)
@ -97,12 +100,16 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase):
"""Manually syncs the 2 DCs to ensure they're in sync""" """Manually syncs the 2 DCs to ensure they're in sync"""
if sync_order == DC1_TO_DC2: if sync_order == DC1_TO_DC2:
# sync DC1-->DC2, then DC2-->DC1 # sync DC1-->DC2, then DC2-->DC1
self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1) self._net_drs_replicate(DC=self.dnsname_dc2,
self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2) fromDC=self.dnsname_dc1)
self._net_drs_replicate(DC=self.dnsname_dc1,
fromDC=self.dnsname_dc2)
else: else:
# sync DC2-->DC1, then DC1-->DC2 # sync DC2-->DC1, then DC1-->DC2
self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2) self._net_drs_replicate(DC=self.dnsname_dc1,
self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1) fromDC=self.dnsname_dc2)
self._net_drs_replicate(DC=self.dnsname_dc2,
fromDC=self.dnsname_dc1)
def ensure_unique_timestamp(self): def ensure_unique_timestamp(self):
"""Waits a second to ensure a unique timestamp between 2 objects""" """Waits a second to ensure a unique timestamp between 2 objects"""
@ -123,12 +130,14 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase):
""" """
actual_len = len(res1[0][attr]) actual_len = len(res1[0][attr])
self.assertTrue(actual_len == expected_count, self.assertTrue(actual_len == expected_count,
"Expected %u %s attributes, but got %u" % (expected_count, "Expected %u %s attributes, got %u" % (expected_count,
attr, actual_len)) attr,
actual_len))
actual_len = len(res2[0][attr]) actual_len = len(res2[0][attr])
self.assertTrue(actual_len == expected_count, self.assertTrue(actual_len == expected_count,
"Expected %u %s attributes, but got %u" % (expected_count, "Expected %u %s attributes, got %u" % (expected_count,
attr, actual_len)) attr,
actual_len))
# check DCs both agree on the same linked attributes # check DCs both agree on the same linked attributes
for val in res1[0][attr]: for val in res1[0][attr]:
@ -214,7 +223,8 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase):
self._check_replicated_links(src_ou, [link1, link2]) self._check_replicated_links(src_ou, [link1, link2])
def test_conflict_single_valued_link(self): def test_conflict_single_valued_link(self):
# repeat the test twice, to give each DC a chance to resolve the conflict # repeat the test twice, to give each DC a chance to resolve
# the conflict
self._test_conflict_single_valued_link(sync_order=DC1_TO_DC2) self._test_conflict_single_valued_link(sync_order=DC1_TO_DC2)
self._test_conflict_single_valued_link(sync_order=DC2_TO_DC1) self._test_conflict_single_valued_link(sync_order=DC2_TO_DC1)
@ -248,7 +258,8 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase):
self.assert_attrs_match(res1, res2, "managedBy", 1) self.assert_attrs_match(res1, res2, "managedBy", 1)
def test_duplicate_single_valued_link(self): def test_duplicate_single_valued_link(self):
# repeat the test twice, to give each DC a chance to resolve the conflict # repeat the test twice, to give each DC a chance to resolve
# the conflict
self._test_duplicate_single_valued_link(sync_order=DC1_TO_DC2) self._test_duplicate_single_valued_link(sync_order=DC1_TO_DC2)
self._test_duplicate_single_valued_link(sync_order=DC2_TO_DC1) self._test_duplicate_single_valued_link(sync_order=DC2_TO_DC1)
@ -267,9 +278,11 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase):
# create the same user (link target) on each DC. # create the same user (link target) on each DC.
# Note that the GUIDs will differ between the DCs # Note that the GUIDs will differ between the DCs
target_dn = self.unique_dn("CN=target") target_dn = self.unique_dn("CN=target")
target1_guid = self.add_object(self.ldb_dc1, target_dn, objectclass="user") target1_guid = self.add_object(self.ldb_dc1, target_dn,
objectclass="user")
self.ensure_unique_timestamp() self.ensure_unique_timestamp()
target2_guid = self.add_object(self.ldb_dc2, target_dn, objectclass="user") target2_guid = self.add_object(self.ldb_dc2, target_dn,
objectclass="user")
# link the src group to the respective target created # link the src group to the respective target created
self.add_link_attr(self.ldb_dc1, src_dn, "member", target_dn) self.add_link_attr(self.ldb_dc1, src_dn, "member", target_dn)
@ -298,7 +311,8 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase):
"Expected link to conflicting target object not found") "Expected link to conflicting target object not found")
def test_conflict_multi_valued_link(self): def test_conflict_multi_valued_link(self):
# repeat the test twice, to give each DC a chance to resolve the conflict # repeat the test twice, to give each DC a chance to resolve
# the conflict
self._test_conflict_multi_valued_link(sync_order=DC1_TO_DC2) self._test_conflict_multi_valued_link(sync_order=DC1_TO_DC2)
self._test_conflict_multi_valued_link(sync_order=DC2_TO_DC1) self._test_conflict_multi_valued_link(sync_order=DC2_TO_DC1)
@ -331,7 +345,8 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase):
self.assert_attrs_match(res1, res2, "member", 1) self.assert_attrs_match(res1, res2, "member", 1)
def test_duplicate_multi_valued_link(self): def test_duplicate_multi_valued_link(self):
# repeat the test twice, to give each DC a chance to resolve the conflict # repeat the test twice, to give each DC a chance to resolve
# the conflict
self._test_duplicate_multi_valued_link(sync_order=DC1_TO_DC2) self._test_duplicate_multi_valued_link(sync_order=DC1_TO_DC2)
self._test_duplicate_multi_valued_link(sync_order=DC2_TO_DC1) self._test_duplicate_multi_valued_link(sync_order=DC2_TO_DC1)
@ -343,7 +358,8 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase):
# create a common link target # create a common link target
target_dn = self.unique_dn("CN=target") target_dn = self.unique_dn("CN=target")
target_guid = self.add_object(self.ldb_dc1, target_dn, objectclass="user") target_guid = self.add_object(self.ldb_dc1, target_dn,
objectclass="user")
self.sync_DCs() self.sync_DCs()
# create the same group (link source) on each DC. # create the same group (link source) on each DC.
@ -367,7 +383,8 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase):
scope=SCOPE_BASE, attrs=["memberOf"]) scope=SCOPE_BASE, attrs=["memberOf"])
src1_backlink = False src1_backlink = False
# our test user should still be a member of 2 groups (check both DCs agree) # our test user should still be a member of 2 groups (check both
# DCs agree)
self.assert_attrs_match(res1, res2, "memberOf", 2) self.assert_attrs_match(res1, res2, "memberOf", 2)
for val in res1[0]["memberOf"]: for val in res1[0]["memberOf"]:
@ -377,10 +394,11 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase):
src1_backlink = True src1_backlink = True
self.assertTrue(src1_backlink, self.assertTrue(src1_backlink,
"Expected backlink to conflicting source object not found") "Backlink to conflicting source object not found")
def test_conflict_backlinks(self): def test_conflict_backlinks(self):
# repeat the test twice, to give each DC a chance to resolve the conflict # repeat the test twice, to give each DC a chance to resolve
# the conflict
self._test_conflict_backlinks(sync_order=DC1_TO_DC2) self._test_conflict_backlinks(sync_order=DC1_TO_DC2)
self._test_conflict_backlinks(sync_order=DC2_TO_DC1) self._test_conflict_backlinks(sync_order=DC2_TO_DC1)
@ -424,12 +442,15 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase):
res2 = self.ldb_dc2.search(base="<GUID=%s>" % src_guid, res2 = self.ldb_dc2.search(base="<GUID=%s>" % src_guid,
scope=SCOPE_BASE, attrs=["member"]) scope=SCOPE_BASE, attrs=["member"])
# our test user should still be a member of the group (check both DCs agree) # our test user should still be a member of the group (check both
self.assertTrue("member" in res1[0], "Expected member attribute missing") # DCs agree)
self.assertTrue("member" in res1[0],
"Expected member attribute missing")
self.assert_attrs_match(res1, res2, "member", 1) self.assert_attrs_match(res1, res2, "member", 1)
def test_link_deletion_conflict(self): def test_link_deletion_conflict(self):
# repeat the test twice, to give each DC a chance to resolve the conflict # repeat the test twice, to give each DC a chance to resolve
# the conflict
self._test_link_deletion_conflict(sync_order=DC1_TO_DC2) self._test_link_deletion_conflict(sync_order=DC1_TO_DC2)
self._test_link_deletion_conflict(sync_order=DC2_TO_DC1) self._test_link_deletion_conflict(sync_order=DC2_TO_DC1)
@ -440,7 +461,8 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase):
""" """
target_dn = self.unique_dn("CN=target") target_dn = self.unique_dn("CN=target")
target_guid = self.add_object(self.ldb_dc1, target_dn, objectclass="user") target_guid = self.add_object(self.ldb_dc1, target_dn,
objectclass="user")
src_dn = self.unique_dn("CN=src") src_dn = self.unique_dn("CN=src")
src_guid = self.add_object(self.ldb_dc1, src_dn, objectclass="group") src_guid = self.add_object(self.ldb_dc1, src_dn, objectclass="group")
@ -463,9 +485,11 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase):
# the object deletion should trump the link addition. # the object deletion should trump the link addition.
# Check the link no longer exists on the remaining object # Check the link no longer exists on the remaining object
res1 = self.ldb_dc1.search(base="<GUID=%s>" % search_guid, res1 = self.ldb_dc1.search(base="<GUID=%s>" % search_guid,
scope=SCOPE_BASE, attrs=["member", "memberOf"]) scope=SCOPE_BASE,
attrs=["member", "memberOf"])
res2 = self.ldb_dc2.search(base="<GUID=%s>" % search_guid, res2 = self.ldb_dc2.search(base="<GUID=%s>" % search_guid,
scope=SCOPE_BASE, attrs=["member", "memberOf"]) scope=SCOPE_BASE,
attrs=["member", "memberOf"])
self.assertFalse("member" in res1[0], "member attr shouldn't exist") self.assertFalse("member" in res1[0], "member attr shouldn't exist")
self.assertFalse("member" in res2[0], "member attr shouldn't exist") self.assertFalse("member" in res2[0], "member attr shouldn't exist")
@ -473,13 +497,18 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase):
self.assertFalse("memberOf" in res2[0], "member attr shouldn't exist") self.assertFalse("memberOf" in res2[0], "member attr shouldn't exist")
def test_obj_deletion_conflict(self): def test_obj_deletion_conflict(self):
# repeat the test twice, to give each DC a chance to resolve the conflict # repeat the test twice, to give each DC a chance to resolve
self._test_obj_deletion_conflict(sync_order=DC1_TO_DC2, del_target=True) # the conflict
self._test_obj_deletion_conflict(sync_order=DC2_TO_DC1, del_target=True) self._test_obj_deletion_conflict(sync_order=DC1_TO_DC2,
del_target=True)
self._test_obj_deletion_conflict(sync_order=DC2_TO_DC1,
del_target=True)
# and also try deleting the source object instead of the link target # and also try deleting the source object instead of the link target
self._test_obj_deletion_conflict(sync_order=DC1_TO_DC2, del_target=False) self._test_obj_deletion_conflict(sync_order=DC1_TO_DC2,
self._test_obj_deletion_conflict(sync_order=DC2_TO_DC1, del_target=False) del_target=False)
self._test_obj_deletion_conflict(sync_order=DC2_TO_DC1,
del_target=False)
def _test_full_sync_link_conflict(self, sync_order): def _test_full_sync_link_conflict(self, sync_order):
""" """
@ -501,11 +530,19 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase):
# Do a couple of full syncs which should resolve the conflict # Do a couple of full syncs which should resolve the conflict
# (but only for one DC) # (but only for one DC)
if sync_order == DC1_TO_DC2: if sync_order == DC1_TO_DC2:
self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, full_sync=True) self._net_drs_replicate(DC=self.dnsname_dc2,
self._net_drs_replicate(DC=self.dnsname_dc2, fromDC=self.dnsname_dc1, full_sync=True) fromDC=self.dnsname_dc1,
full_sync=True)
self._net_drs_replicate(DC=self.dnsname_dc2,
fromDC=self.dnsname_dc1,
full_sync=True)
else: else:
self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, full_sync=True) self._net_drs_replicate(DC=self.dnsname_dc1,
self._net_drs_replicate(DC=self.dnsname_dc1, fromDC=self.dnsname_dc2, full_sync=True) fromDC=self.dnsname_dc2,
full_sync=True)
self._net_drs_replicate(DC=self.dnsname_dc1,
fromDC=self.dnsname_dc2,
full_sync=True)
# delete and re-add the link on one DC # delete and re-add the link on one DC
self.del_link_attr(self.ldb_dc1, src_dn, "member", target_dn) self.del_link_attr(self.ldb_dc1, src_dn, "member", target_dn)
@ -525,11 +562,13 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase):
scope=SCOPE_BASE, attrs=["member"]) scope=SCOPE_BASE, attrs=["member"])
# check the membership still exits (and both DCs agree) # check the membership still exits (and both DCs agree)
self.assertTrue("member" in res1[0], "Expected member attribute missing") self.assertTrue("member" in res1[0],
"Expected member attribute missing")
self.assert_attrs_match(res1, res2, "member", 1) self.assert_attrs_match(res1, res2, "member", 1)
def test_full_sync_link_conflict(self): def test_full_sync_link_conflict(self):
# repeat the test twice, to give each DC a chance to resolve the conflict # repeat the test twice, to give each DC a chance to resolve
# the conflict
self._test_full_sync_link_conflict(sync_order=DC1_TO_DC2) self._test_full_sync_link_conflict(sync_order=DC1_TO_DC2)
self._test_full_sync_link_conflict(sync_order=DC2_TO_DC1) self._test_full_sync_link_conflict(sync_order=DC2_TO_DC1)
@ -586,7 +625,8 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase):
def _test_conflict_single_valued_link_deleted_loser(self, sync_order): def _test_conflict_single_valued_link_deleted_loser(self, sync_order):
""" """
Tests a single-valued link conflict, where the losing link value is deleted. Tests a single-valued link conflict, where the losing link value is
deleted.
""" """
src_ou = self.unique_dn("OU=src") src_ou = self.unique_dn("OU=src")
src_guid = self.add_object(self.ldb_dc1, src_ou) src_guid = self.add_object(self.ldb_dc1, src_ou)
@ -599,9 +639,9 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase):
target1_guid = self.add_object(self.ldb_dc1, target1_ou) target1_guid = self.add_object(self.ldb_dc1, target1_ou)
target2_guid = self.add_object(self.ldb_dc2, target2_ou) target2_guid = self.add_object(self.ldb_dc2, target2_ou)
# add the links - we want the link to end up deleted on DC2, but active on # add the links - we want the link to end up deleted on DC2, but active
# DC1. DC1 has the better version and DC2 has the better timestamp - the # on DC1. DC1 has the better version and DC2 has the better timestamp -
# better version should win # the better version should win
self.add_link_attr(self.ldb_dc1, src_ou, "managedBy", target1_ou) self.add_link_attr(self.ldb_dc1, src_ou, "managedBy", target1_ou)
self.del_link_attr(self.ldb_dc1, src_ou, "managedBy", target1_ou) self.del_link_attr(self.ldb_dc1, src_ou, "managedBy", target1_ou)
self.add_link_attr(self.ldb_dc1, src_ou, "managedBy", target1_ou) self.add_link_attr(self.ldb_dc1, src_ou, "managedBy", target1_ou)
@ -689,7 +729,8 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase):
self._check_replicated_links(src_ou, [link1, link2]) self._check_replicated_links(src_ou, [link1, link2])
def test_conflict_existing_single_valued_link(self): def test_conflict_existing_single_valued_link(self):
# repeat the test twice, to give each DC a chance to resolve the conflict # repeat the test twice, to give each DC a chance to resolve
# the conflict
self._test_conflict_existing_single_valued_link(sync_order=DC1_TO_DC2) self._test_conflict_existing_single_valued_link(sync_order=DC1_TO_DC2)
self._test_conflict_existing_single_valued_link(sync_order=DC2_TO_DC1) self._test_conflict_existing_single_valued_link(sync_order=DC2_TO_DC1)
@ -707,7 +748,7 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase):
# get the link info via replication # get the link info via replication
ctr6 = self._get_replication(drsuapi.DRSUAPI_DRS_WRIT_REP, ctr6 = self._get_replication(drsuapi.DRSUAPI_DRS_WRIT_REP,
dest_dsa=None, dest_dsa=None,
drs_error=drsuapi.DRSUAPI_EXOP_ERR_SUCCESS, drs_error=DRSUAPI_EXOP_ERR_SUCCESS,
exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ, exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
highwatermark=self.zero_highwatermark(), highwatermark=self.zero_highwatermark(),
nc_dn_str=src_ou) nc_dn_str=src_ou)
@ -715,6 +756,6 @@ class DrsReplicaLinkConflictTestCase(drs_base.DrsBaseTestCase):
self.assertTrue(ctr6.linked_attributes_count == 1, self.assertTrue(ctr6.linked_attributes_count == 1,
"DRS didn't return a link") "DRS didn't return a link")
link = ctr6.linked_attributes[0] link = ctr6.linked_attributes[0]
self.assertTrue(link.meta_data.version == 1, rcvd_version = link.meta_data.version
"Link version started from %u, not 1" % link.meta_data.version) self.assertTrue(rcvd_version == 1,
"Link version started from %u, not 1" % rcvd_version)