1
0
mirror of https://github.com/samba-team/samba.git synced 2025-01-24 02:04:21 +03:00
Björn Baumbach f13b1da046 test: samba-tool user show: Test ';format=[GeneralizedTime,UnixTime,TimeSpec] attributes
Pair-Programmed-With: Stefan Metzmacher <metze@samba.org>

Signed-off-by: Björn Baumbach <bb@sernet.de>
Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
2021-03-01 03:50:35 +00:00

1185 lines
58 KiB
Python

# Unix SMB/CIFS implementation.
# Copyright (C) Sean Dague <sdague@linux.vnet.ibm.com> 2011
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import os
import time
import base64
import ldb
from samba.tests.samba_tool.base import SambaToolCmdTest
from samba import (
credentials,
nttime2unix,
dsdb
)
from samba.ndr import ndr_unpack
from samba.dcerpc import drsblobs
from samba.common import get_bytes
from samba.common import get_string
from samba.tests import env_loadparm
class UserCmdTestCase(SambaToolCmdTest):
"""Tests for samba-tool user subcommands"""
users = []
samdb = None
def setUp(self):
super(UserCmdTestCase, self).setUp()
self.samdb = self.getSamDB("-H", "ldap://%s" % os.environ["DC_SERVER"],
"-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
# Modify the default template homedir
lp = self.get_loadparm()
self.template_homedir = lp.get('template homedir')
lp.set('template homedir', '/home/test/%D/%U')
self.users = []
self.users.append(self._randomUser({"name": "sambatool1", "company": "comp1"}))
self.users.append(self._randomUser({"name": "sambatool2", "company": "comp1"}))
self.users.append(self._randomUser({"name": "sambatool3", "company": "comp2"}))
self.users.append(self._randomUser({"name": "sambatool4", "company": "comp2"}))
self.users.append(self._randomPosixUser({"name": "posixuser1"}))
self.users.append(self._randomPosixUser({"name": "posixuser2"}))
self.users.append(self._randomPosixUser({"name": "posixuser3"}))
self.users.append(self._randomPosixUser({"name": "posixuser4"}))
self.users.append(self._randomUnixUser({"name": "unixuser1"}))
self.users.append(self._randomUnixUser({"name": "unixuser2"}))
self.users.append(self._randomUnixUser({"name": "unixuser3"}))
self.users.append(self._randomUnixUser({"name": "unixuser4"}))
# setup the 12 users and ensure they are correct
for user in self.users:
(result, out, err) = user["createUserFn"](user)
self.assertCmdSuccess(result, out, err)
self.assertEqual(err, "", "Shouldn't be any error messages")
if 'unix' in user["name"]:
self.assertIn("Modified User '%s' successfully" % user["name"],
out)
else:
self.assertIn("User '%s' added successfully" % user["name"],
out)
user["checkUserFn"](user)
def tearDown(self):
super(UserCmdTestCase, self).tearDown()
# clean up all the left over users, just in case
for user in self.users:
if self._find_user(user["name"]):
self.runsubcmd("user", "delete", user["name"])
lp = env_loadparm()
# second run of this test
# the cache is still there and '--cache-ldb-initialize'
# will fail
cachedb = lp.private_path("user-syncpasswords-cache.ldb")
if os.path.exists(cachedb):
os.remove(cachedb)
lp.set('template homedir', self.template_homedir)
def test_newuser(self):
# try to add all the users again, this should fail
for user in self.users:
(result, out, err) = self._create_user(user)
self.assertCmdFail(result, "Ensure that create user fails")
self.assertIn("LDAP error 68 LDAP_ENTRY_ALREADY_EXISTS", err)
# try to delete all the 4 users we just added
for user in self.users:
(result, out, err) = self.runsubcmd("user", "delete", user["name"])
self.assertCmdSuccess(result, out, err, "Can we delete users")
found = self._find_user(user["name"])
self.assertIsNone(found)
# test adding users with --use-username-as-cn
for user in self.users:
(result, out, err) = self.runsubcmd("user", "create", user["name"], user["password"],
"--use-username-as-cn",
"--surname=%s" % user["surname"],
"--given-name=%s" % user["given-name"],
"--job-title=%s" % user["job-title"],
"--department=%s" % user["department"],
"--description=%s" % user["description"],
"--company=%s" % user["company"],
"-H", "ldap://%s" % os.environ["DC_SERVER"],
"-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
self.assertCmdSuccess(result, out, err)
self.assertEqual(err, "", "Shouldn't be any error messages")
self.assertIn("User '%s' added successfully" % user["name"], out)
found = self._find_user(user["name"])
self.assertEqual("%s" % found.get("cn"), "%(name)s" % user)
self.assertEqual("%s" % found.get("name"), "%(name)s" % user)
def _verify_supplementalCredentials(self, ldif,
min_packages=3,
max_packages=6):
msgs = self.samdb.parse_ldif(ldif)
(changetype, obj) = next(msgs)
self.assertIn("supplementalCredentials", obj, "supplementalCredentials attribute required")
sc_blob = obj["supplementalCredentials"][0]
sc = ndr_unpack(drsblobs.supplementalCredentialsBlob, sc_blob)
self.assertGreaterEqual(sc.sub.num_packages,
min_packages, "min_packages check")
self.assertLessEqual(sc.sub.num_packages,
max_packages, "max_packages check")
if max_packages == 0:
return
def find_package(packages, name, start_idx=0):
for i in range(start_idx, len(packages)):
if packages[i].name == name:
return (i, packages[i])
return (None, None)
# The ordering is this
#
# Primary:Kerberos-Newer-Keys (optional)
# Primary:Kerberos
# Primary:WDigest
# Primary:CLEARTEXT (optional)
# Primary:SambaGPG (optional)
#
# And the 'Packages' package is insert before the last
# other package.
nidx = 0
(pidx, pp) = find_package(sc.sub.packages, "Packages", start_idx=nidx)
self.assertIsNotNone(pp, "Packages required")
self.assertEqual(pidx + 1, sc.sub.num_packages - 1,
"Packages needs to be at num_packages - 1")
(knidx, knp) = find_package(sc.sub.packages, "Primary:Kerberos-Newer-Keys",
start_idx=nidx)
if knidx is not None:
self.assertEqual(knidx, nidx, "Primary:Kerberos-Newer-Keys at wrong position")
nidx = nidx + 1
if nidx == pidx:
nidx = nidx + 1
(kidx, kp) = find_package(sc.sub.packages, "Primary:Kerberos",
start_idx=nidx)
self.assertIsNotNone(pp, "Primary:Kerberos required")
self.assertEqual(kidx, nidx, "Primary:Kerberos at wrong position")
nidx = nidx + 1
if nidx == pidx:
nidx = nidx + 1
(widx, wp) = find_package(sc.sub.packages, "Primary:WDigest",
start_idx=nidx)
self.assertIsNotNone(pp, "Primary:WDigest required")
self.assertEqual(widx, nidx, "Primary:WDigest at wrong position")
nidx = nidx + 1
if nidx == pidx:
nidx = nidx + 1
(cidx, cp) = find_package(sc.sub.packages, "Primary:CLEARTEXT",
start_idx=nidx)
if cidx is not None:
self.assertEqual(cidx, nidx, "Primary:CLEARTEXT at wrong position")
nidx = nidx + 1
if nidx == pidx:
nidx = nidx + 1
(gidx, gp) = find_package(sc.sub.packages, "Primary:SambaGPG",
start_idx=nidx)
if gidx is not None:
self.assertEqual(gidx, nidx, "Primary:SambaGPG at wrong position")
nidx = nidx + 1
if nidx == pidx:
nidx = nidx + 1
self.assertEqual(nidx, sc.sub.num_packages, "Unknown packages found")
def test_setpassword(self):
for user in self.users:
newpasswd = self.random_password(16)
(result, out, err) = self.runsubcmd("user", "setpassword",
user["name"],
"--newpassword=%s" % newpasswd,
"-H", "ldap://%s" % os.environ["DC_SERVER"],
"-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
self.assertCmdSuccess(result, out, err, "Ensure setpassword runs")
self.assertEqual(err, "", "setpassword with url")
self.assertMatch(out, "Changed password OK", "setpassword with url")
attributes = "sAMAccountName,unicodePwd,supplementalCredentials,virtualClearTextUTF8,virtualClearTextUTF16,virtualSSHA,virtualSambaGPG"
(result, out, err) = self.runsubcmd("user", "syncpasswords",
"--cache-ldb-initialize",
"--attributes=%s" % attributes,
"--decrypt-samba-gpg")
self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --cache-ldb-initialize runs")
self.assertEqual(err, "", "getpassword without url")
cache_attrs = {
"objectClass": {"value": "userSyncPasswords"},
"samdbUrl": {},
"dirsyncFilter": {},
"dirsyncAttribute": {},
"dirsyncControl": {"value": "dirsync:1:0:0"},
"passwordAttribute": {},
"decryptSambaGPG": {},
"currentTime": {},
}
for a in cache_attrs.keys():
v = cache_attrs[a].get("value", "")
self.assertMatch(out, "%s: %s" % (a, v),
"syncpasswords --cache-ldb-initialize: %s: %s out[%s]" % (a, v, out))
(result, out, err) = self.runsubcmd("user", "syncpasswords", "--no-wait")
self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --no-wait runs")
self.assertEqual(err, "", "syncpasswords --no-wait")
self.assertMatch(out, "dirsync_loop(): results 0",
"syncpasswords --no-wait: 'dirsync_loop(): results 0': out[%s]" % (out))
for user in self.users:
self.assertMatch(out, "sAMAccountName: %s" % (user["name"]),
"syncpasswords --no-wait: 'sAMAccountName': %s out[%s]" % (user["name"], out))
for user in self.users:
newpasswd = self.random_password(16)
creds = credentials.Credentials()
creds.set_anonymous()
creds.set_password(newpasswd)
nthash = creds.get_nt_hash()
unicodePwd = base64.b64encode(creds.get_nt_hash()).decode('utf8')
virtualClearTextUTF8 = base64.b64encode(get_bytes(newpasswd)).decode('utf8')
virtualClearTextUTF16 = base64.b64encode(get_string(newpasswd).encode('utf-16-le')).decode('utf8')
(result, out, err) = self.runsubcmd("user", "setpassword",
user["name"],
"--newpassword=%s" % newpasswd)
self.assertCmdSuccess(result, out, err, "Ensure setpassword runs")
self.assertEqual(err, "", "setpassword without url")
self.assertMatch(out, "Changed password OK", "setpassword without url")
(result, out, err) = self.runsubcmd("user", "syncpasswords", "--no-wait")
self.assertCmdSuccess(result, out, err, "Ensure syncpasswords --no-wait runs")
self.assertEqual(err, "", "syncpasswords --no-wait")
self.assertMatch(out, "dirsync_loop(): results 0",
"syncpasswords --no-wait: 'dirsync_loop(): results 0': out[%s]" % (out))
self.assertMatch(out, "sAMAccountName: %s" % (user["name"]),
"syncpasswords --no-wait: 'sAMAccountName': %s out[%s]" % (user["name"], out))
self.assertMatch(out, "# unicodePwd::: REDACTED SECRET ATTRIBUTE",
"getpassword '# unicodePwd::: REDACTED SECRET ATTRIBUTE': out[%s]" % out)
self.assertMatch(out, "unicodePwd:: %s" % unicodePwd,
"getpassword unicodePwd: out[%s]" % out)
self.assertMatch(out, "# supplementalCredentials::: REDACTED SECRET ATTRIBUTE",
"getpassword '# supplementalCredentials::: REDACTED SECRET ATTRIBUTE': out[%s]" % out)
self.assertMatch(out, "supplementalCredentials:: ",
"getpassword supplementalCredentials: out[%s]" % out)
if "virtualSambaGPG:: " in out:
self.assertMatch(out, "virtualClearTextUTF8:: %s" % virtualClearTextUTF8,
"getpassword virtualClearTextUTF8: out[%s]" % out)
self.assertMatch(out, "virtualClearTextUTF16:: %s" % virtualClearTextUTF16,
"getpassword virtualClearTextUTF16: out[%s]" % out)
self.assertMatch(out, "virtualSSHA: ",
"getpassword virtualSSHA: out[%s]" % out)
(result, out, err) = self.runsubcmd("user", "getpassword",
user["name"],
"--attributes=%s" % attributes,
"--decrypt-samba-gpg")
self.assertCmdSuccess(result, out, err, "Ensure getpassword runs")
self.assertEqual(err, "", "getpassword without url")
self.assertMatch(out, "Got password OK", "getpassword without url")
self.assertMatch(out, "sAMAccountName: %s" % (user["name"]),
"getpassword: 'sAMAccountName': %s out[%s]" % (user["name"], out))
self.assertMatch(out, "unicodePwd:: %s" % unicodePwd,
"getpassword unicodePwd: out[%s]" % out)
self.assertMatch(out, "supplementalCredentials:: ",
"getpassword supplementalCredentials: out[%s]" % out)
self._verify_supplementalCredentials(out.replace("\nGot password OK\n", ""))
if "virtualSambaGPG:: " in out:
self.assertMatch(out, "virtualClearTextUTF8:: %s" % virtualClearTextUTF8,
"getpassword virtualClearTextUTF8: out[%s]" % out)
self.assertMatch(out, "virtualClearTextUTF16:: %s" % virtualClearTextUTF16,
"getpassword virtualClearTextUTF16: out[%s]" % out)
self.assertMatch(out, "virtualSSHA: ",
"getpassword virtualSSHA: out[%s]" % out)
for user in self.users:
newpasswd = self.random_password(16)
(result, out, err) = self.runsubcmd("user", "setpassword",
user["name"],
"--newpassword=%s" % newpasswd,
"--must-change-at-next-login",
"-H", "ldap://%s" % os.environ["DC_SERVER"],
"-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
self.assertCmdSuccess(result, out, err, "Ensure setpassword runs")
self.assertEqual(err, "", "setpassword with forced change")
self.assertMatch(out, "Changed password OK", "setpassword with forced change")
def test_setexpiry(self):
for user in self.users:
twodays = time.time() + (2 * 24 * 60 * 60)
(result, out, err) = self.runsubcmd("user", "setexpiry", user["name"],
"--days=2",
"-H", "ldap://%s" % os.environ["DC_SERVER"],
"-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
self.assertCmdSuccess(result, out, err, "Can we run setexpiry with names")
self.assertIn("Expiry for user '%s' set to 2 days." % user["name"], out)
found = self._find_user(user["name"])
expires = nttime2unix(int("%s" % found.get("accountExpires")))
self.assertWithin(expires, twodays, 5, "Ensure account expires is within 5 seconds of the expected time")
# TODO: renable this after the filter case is sorted out
if "filters are broken, bail now":
return
# now run the expiration based on a filter
fourdays = time.time() + (4 * 24 * 60 * 60)
(result, out, err) = self.runsubcmd("user", "setexpiry",
"--filter", "(&(objectClass=user)(company=comp2))",
"--days=4",
"-H", "ldap://%s" % os.environ["DC_SERVER"],
"-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
self.assertCmdSuccess(result, out, err, "Can we run setexpiry with a filter")
for user in self.users:
found = self._find_user(user["name"])
if ("%s" % found.get("company")) == "comp2":
expires = nttime2unix(int("%s" % found.get("accountExpires")))
self.assertWithin(expires, fourdays, 5, "Ensure account expires is within 5 seconds of the expected time")
else:
expires = nttime2unix(int("%s" % found.get("accountExpires")))
self.assertWithin(expires, twodays, 5, "Ensure account expires is within 5 seconds of the expected time")
def test_list(self):
(result, out, err) = self.runsubcmd("user", "list",
"-H", "ldap://%s" % os.environ["DC_SERVER"],
"-U%s%%%s" % (os.environ["DC_USERNAME"],
os.environ["DC_PASSWORD"]))
self.assertCmdSuccess(result, out, err, "Error running list")
search_filter = ("(&(objectClass=user)(userAccountControl:%s:=%u))" %
(ldb.OID_COMPARATOR_AND, dsdb.UF_NORMAL_ACCOUNT))
userlist = self.samdb.search(base=self.samdb.domain_dn(),
scope=ldb.SCOPE_SUBTREE,
expression=search_filter,
attrs=["samaccountname"])
self.assertTrue(len(userlist) > 0, "no users found in samdb")
for userobj in userlist:
name = str(userobj.get("samaccountname", idx=0))
found = self.assertMatch(out, name,
"user '%s' not found" % name)
def test_list_base_dn(self):
base_dn = "CN=Users"
(result, out, err) = self.runsubcmd("user", "list", "-b", base_dn,
"-H", "ldap://%s" % os.environ["DC_SERVER"],
"-U%s%%%s" % (os.environ["DC_USERNAME"],
os.environ["DC_PASSWORD"]))
self.assertCmdSuccess(result, out, err, "Error running list")
search_filter = ("(&(objectClass=user)(userAccountControl:%s:=%u))" %
(ldb.OID_COMPARATOR_AND, dsdb.UF_NORMAL_ACCOUNT))
userlist = self.samdb.search(base=self.samdb.normalize_dn_in_domain(base_dn),
scope=ldb.SCOPE_SUBTREE,
expression=search_filter,
attrs=["samaccountname"])
self.assertTrue(len(userlist) > 0, "no users found in samdb")
for userobj in userlist:
name = str(userobj.get("samaccountname", idx=0))
found = self.assertMatch(out, name,
"user '%s' not found" % name)
def test_list_full_dn(self):
(result, out, err) = self.runsubcmd("user", "list", "--full-dn",
"-H", "ldap://%s" % os.environ["DC_SERVER"],
"-U%s%%%s" % (os.environ["DC_USERNAME"],
os.environ["DC_PASSWORD"]))
self.assertCmdSuccess(result, out, err, "Error running list")
search_filter = ("(&(objectClass=user)(userAccountControl:%s:=%u))" %
(ldb.OID_COMPARATOR_AND, dsdb.UF_NORMAL_ACCOUNT))
userlist = self.samdb.search(base=self.samdb.domain_dn(),
scope=ldb.SCOPE_SUBTREE,
expression=search_filter,
attrs=["dn"])
self.assertTrue(len(userlist) > 0, "no users found in samdb")
for userobj in userlist:
name = str(userobj.get("dn", idx=0))
found = self.assertMatch(out, name,
"user '%s' not found" % name)
def test_list_hide_expired(self):
expire_username = "expireUser"
expire_user = self._randomUser({"name": expire_username})
self._create_user(expire_user)
(result, out, err) = self.runsubcmd(
"user",
"list",
"--hide-expired",
"-H",
"ldap://%s" % os.environ["DC_SERVER"],
"-U%s%%%s" % (os.environ["DC_USERNAME"],
os.environ["DC_PASSWORD"]))
self.assertCmdSuccess(result, out, err, "Error running list")
self.assertTrue(expire_username in out,
"user '%s' not found" % expire_username)
# user will be expired one second ago
self.samdb.setexpiry(
"(sAMAccountname=%s)" % expire_username,
-1,
False)
(result, out, err) = self.runsubcmd(
"user",
"list",
"--hide-expired",
"-H",
"ldap://%s" % os.environ["DC_SERVER"],
"-U%s%%%s" % (os.environ["DC_USERNAME"],
os.environ["DC_PASSWORD"]))
self.assertCmdSuccess(result, out, err, "Error running list")
self.assertFalse(expire_username in out,
"user '%s' found" % expire_username)
self.samdb.deleteuser(expire_username)
def test_list_hide_disabled(self):
disable_username = "disableUser"
disable_user = self._randomUser({"name": disable_username})
self._create_user(disable_user)
(result, out, err) = self.runsubcmd(
"user",
"list",
"--hide-disabled",
"-H",
"ldap://%s" % os.environ["DC_SERVER"],
"-U%s%%%s" % (os.environ["DC_USERNAME"],
os.environ["DC_PASSWORD"]))
self.assertCmdSuccess(result, out, err, "Error running list")
self.assertTrue(disable_username in out,
"user '%s' not found" % disable_username)
self.samdb.disable_account("(sAMAccountname=%s)" % disable_username)
(result, out, err) = self.runsubcmd(
"user",
"list",
"--hide-disabled",
"-H",
"ldap://%s" % os.environ["DC_SERVER"],
"-U%s%%%s" % (os.environ["DC_USERNAME"],
os.environ["DC_PASSWORD"]))
self.assertCmdSuccess(result, out, err, "Error running list")
self.assertFalse(disable_username in out,
"user '%s' found" % disable_username)
self.samdb.deleteuser(disable_username)
def test_show(self):
for user in self.users:
(result, out, err) = self.runsubcmd(
"user", "show", user["name"],
"--attributes=sAMAccountName,company",
"-H", "ldap://%s" % os.environ["DC_SERVER"],
"-U%s%%%s" % (os.environ["DC_USERNAME"],
os.environ["DC_PASSWORD"]))
self.assertCmdSuccess(result, out, err, "Error running show")
expected_out = """dn: CN=%s %s,CN=Users,%s
company: %s
sAMAccountName: %s
""" % (user["given-name"], user["surname"], self.samdb.domain_dn(),
user["company"], user["name"])
self.assertEqual(out, expected_out,
"Unexpected show output for user '%s'" %
user["name"])
time_attrs = [
"name", # test that invalid values are just ignored
"whenCreated",
"whenChanged",
"accountExpires",
"badPasswordTime",
"lastLogoff",
"lastLogon",
"lastLogonTimestamp",
"lockoutTime",
"msDS-UserPasswordExpiryTimeComputed",
"pwdLastSet",
]
attrs = []
for ta in time_attrs:
attrs.append(ta)
for fm in ["GeneralizedTime", "UnixTime", "TimeSpec"]:
attrs.append("%s;format=%s" % (ta, fm))
(result, out, err) = self.runsubcmd(
"user", "show", user["name"],
"--attributes=%s" % ",".join(attrs),
"-H", "ldap://%s" % os.environ["DC_SERVER"],
"-U%s%%%s" % (os.environ["DC_USERNAME"],
os.environ["DC_PASSWORD"]))
self.assertCmdSuccess(result, out, err, "Error running show")
self.assertIn(";format=GeneralizedTime", out)
self.assertIn(";format=UnixTime", out)
self.assertIn(";format=TimeSpec", out)
self.assertIn("name: ", out)
self.assertNotIn("name;format=GeneralizedTime: ", out)
self.assertNotIn("name;format=UnixTime: ", out)
self.assertNotIn("name;format=TimeSpec: ", out)
self.assertIn("whenCreated: 20", out)
self.assertIn("whenCreated;format=GeneralizedTime: 20", out)
self.assertIn("whenCreated;format=UnixTime: 1", out)
self.assertIn("whenCreated;format=TimeSpec: 1", out)
self.assertIn("whenChanged: 20", out)
self.assertIn("whenChanged;format=GeneralizedTime: 20", out)
self.assertIn("whenChanged;format=UnixTime: 1", out)
self.assertIn("whenChanged;format=TimeSpec: 1", out)
self.assertIn("accountExpires: 9223372036854775807", out)
self.assertNotIn("accountExpires;format=GeneralizedTime: ", out)
self.assertNotIn("accountExpires;format=UnixTime: ", out)
self.assertNotIn("accountExpires;format=TimeSpec: ", out)
self.assertIn("badPasswordTime: 0", out)
self.assertNotIn("badPasswordTime;format=GeneralizedTime: ", out)
self.assertNotIn("badPasswordTime;format=UnixTime: ", out)
self.assertNotIn("badPasswordTime;format=TimeSpec: ", out)
self.assertIn("lastLogoff: 0", out)
self.assertNotIn("lastLogoff;format=GeneralizedTime: ", out)
self.assertNotIn("lastLogoff;format=UnixTime: ", out)
self.assertNotIn("lastLogoff;format=TimeSpec: ", out)
self.assertIn("lastLogon: 0", out)
self.assertNotIn("lastLogon;format=GeneralizedTime: ", out)
self.assertNotIn("lastLogon;format=UnixTime: ", out)
self.assertNotIn("lastLogon;format=TimeSpec: ", out)
# If a specified attribute is not available on a user object
# it's silently omitted.
self.assertNotIn("lastLogonTimestamp:", out)
self.assertNotIn("lockoutTime:", out)
self.assertIn("msDS-UserPasswordExpiryTimeComputed: 1", out)
self.assertIn("msDS-UserPasswordExpiryTimeComputed;format=GeneralizedTime: 20", out)
self.assertIn("msDS-UserPasswordExpiryTimeComputed;format=UnixTime: 1", out)
self.assertIn("msDS-UserPasswordExpiryTimeComputed;format=TimeSpec: 1", out)
self.assertIn("pwdLastSet: 1", out)
self.assertIn("pwdLastSet;format=GeneralizedTime: 20", out)
self.assertIn("pwdLastSet;format=UnixTime: 1", out)
self.assertIn("pwdLastSet;format=TimeSpec: 1", out)
out_msgs = self.samdb.parse_ldif(out)
out_msg = next(out_msgs)[1]
self.assertIn("whenCreated", out_msg)
when_created_str = str(out_msg["whenCreated"][0])
self.assertIn("whenCreated;format=GeneralizedTime", out_msg)
self.assertEqual(str(out_msg["whenCreated;format=GeneralizedTime"][0]), when_created_str)
when_created_time = ldb.string_to_time(when_created_str)
self.assertIn("whenCreated;format=UnixTime", out_msg)
self.assertEqual(str(out_msg["whenCreated;format=UnixTime"][0]), str(when_created_time))
self.assertIn("whenCreated;format=TimeSpec", out_msg)
self.assertEqual(str(out_msg["whenCreated;format=TimeSpec"][0]),
"%d.000000000" % (when_created_time))
self.assertIn("whenChanged", out_msg)
when_changed_str = str(out_msg["whenChanged"][0])
self.assertIn("whenChanged;format=GeneralizedTime", out_msg)
self.assertEqual(str(out_msg["whenChanged;format=GeneralizedTime"][0]), when_changed_str)
when_changed_time = ldb.string_to_time(when_changed_str)
self.assertIn("whenChanged;format=UnixTime", out_msg)
self.assertEqual(str(out_msg["whenChanged;format=UnixTime"][0]), str(when_changed_time))
self.assertIn("whenChanged;format=TimeSpec", out_msg)
self.assertEqual(str(out_msg["whenChanged;format=TimeSpec"][0]),
"%d.000000000" % (when_changed_time))
self.assertIn("pwdLastSet;format=GeneralizedTime", out_msg)
pwd_last_set_str = str(out_msg["pwdLastSet;format=GeneralizedTime"][0])
pwd_last_set_time = ldb.string_to_time(pwd_last_set_str)
self.assertIn("pwdLastSet;format=UnixTime", out_msg)
self.assertEqual(str(out_msg["pwdLastSet;format=UnixTime"][0]), str(pwd_last_set_time))
self.assertIn("pwdLastSet;format=TimeSpec", out_msg)
self.assertIn("%d." % pwd_last_set_time, str(out_msg["pwdLastSet;format=TimeSpec"][0]))
self.assertNotIn(".000000000", str(out_msg["pwdLastSet;format=TimeSpec"][0]))
# assert that the pwd has been set in the minute after user creation
self.assertGreaterEqual(pwd_last_set_time, when_created_time)
self.assertLess(pwd_last_set_time, when_created_time + 60)
self.assertIn("msDS-UserPasswordExpiryTimeComputed;format=GeneralizedTime", out_msg)
pwd_expires_str = str(out_msg["msDS-UserPasswordExpiryTimeComputed;format=GeneralizedTime"][0])
pwd_expires_time = ldb.string_to_time(pwd_expires_str)
self.assertIn("msDS-UserPasswordExpiryTimeComputed;format=UnixTime", out_msg)
self.assertEqual(str(out_msg["msDS-UserPasswordExpiryTimeComputed;format=UnixTime"][0]), str(pwd_expires_time))
self.assertIn("msDS-UserPasswordExpiryTimeComputed;format=TimeSpec", out_msg)
self.assertIn("%d." % pwd_expires_time, str(out_msg["msDS-UserPasswordExpiryTimeComputed;format=TimeSpec"][0]))
self.assertNotIn(".000000000", str(out_msg["msDS-UserPasswordExpiryTimeComputed;format=TimeSpec"][0]))
# assert that the pwd expires after it was set
self.assertGreater(pwd_expires_time, pwd_last_set_time)
def test_move(self):
full_ou_dn = str(self.samdb.normalize_dn_in_domain("OU=movetest"))
(result, out, err) = self.runsubcmd("ou", "add", full_ou_dn)
self.assertCmdSuccess(result, out, err)
self.assertEqual(err, "", "There shouldn't be any error message")
self.assertIn('Added ou "%s"' % full_ou_dn, out)
for user in self.users:
(result, out, err) = self.runsubcmd(
"user", "move", user["name"], full_ou_dn)
self.assertCmdSuccess(result, out, err, "Error running move")
self.assertIn('Moved user "%s" into "%s"' %
(user["name"], full_ou_dn), out)
# Should fail as users objects are in OU
(result, out, err) = self.runsubcmd("ou", "delete", full_ou_dn)
self.assertCmdFail(result)
self.assertIn(("subtree_delete: Unable to delete a non-leaf node "
"(it has %d children)!") % len(self.users), err)
for user in self.users:
new_dn = "CN=Users,%s" % self.samdb.domain_dn()
(result, out, err) = self.runsubcmd(
"user", "move", user["name"], new_dn)
self.assertCmdSuccess(result, out, err, "Error running move")
self.assertIn('Moved user "%s" into "%s"' %
(user["name"], new_dn), out)
(result, out, err) = self.runsubcmd("ou", "delete", full_ou_dn)
self.assertCmdSuccess(result, out, err,
"Failed to delete ou '%s'" % full_ou_dn)
def test_rename_surname_initials_givenname(self):
"""rename the existing surname and given name and add missing
initials, then remove them, for all users"""
for user in self.users:
new_givenname = "new_given_name_of_" + user["name"]
new_initials = "A"
new_surname = "new_surname_of_" + user["name"]
found = self._find_user(user["name"])
old_cn = str(found.get("cn"))
# rename given name, initials and surname
(result, out, err) = self.runsubcmd("user", "rename", user["name"],
"--surname=%s" % new_surname,
"--initials=%s" % new_initials,
"--given-name=%s" % new_givenname)
self.assertCmdSuccess(result, out, err)
self.assertEqual(err, "", "Shouldn't be any error messages")
self.assertIn('successfully', out)
found = self._find_user(user["name"])
self.assertEqual("%s" % found.get("givenName"), new_givenname)
self.assertEqual("%s" % found.get("initials"), new_initials)
self.assertEqual("%s" % found.get("sn"), new_surname)
self.assertEqual("%s" % found.get("name"),
"%s %s. %s" % (new_givenname, new_initials, new_surname))
self.assertEqual("%s" % found.get("cn"),
"%s %s. %s" % (new_givenname, new_initials, new_surname))
# remove given name, initials and surname
(result, out, err) = self.runsubcmd("user", "rename", user["name"],
"--surname=",
"--initials=",
"--given-name=")
self.assertCmdSuccess(result, out, err)
self.assertEqual(err, "", "Shouldn't be any error messages")
self.assertIn('successfully', out)
found = self._find_user(user["name"])
self.assertEqual(found.get("givenName"), None)
self.assertEqual(found.get("initials"), None)
self.assertEqual(found.get("sn"), None)
self.assertEqual("%s" % found.get("cn"), user["name"])
# reset changes (initials are removed)
(result, out, err) = self.runsubcmd("user", "rename", user["name"],
"--surname=%(surname)s" % user,
"--given-name=%(given-name)s" % user)
self.assertCmdSuccess(result, out, err)
if old_cn:
(result, out, err) = self.runsubcmd("user", "rename", user["name"],
"--force-new-cn=%s" % old_cn)
def test_rename_cn_samaccountname(self):
"""rename and try to remove the cn and the samaccount of all users"""
for user in self.users:
new_cn = "new_cn_of_" + user["name"]
new_samaccountname = "new_samaccount_of_" + user["name"]
new_surname = "new_surname_of_" + user["name"]
# rename cn
(result, out, err) = self.runsubcmd("user", "rename", user["name"],
"--samaccountname=%s"
% new_samaccountname,
"--force-new-cn=%s" % new_cn)
self.assertCmdSuccess(result, out, err)
self.assertEqual(err, "", "Shouldn't be any error messages")
self.assertIn('successfully', out)
found = self._find_user(new_samaccountname)
self.assertEqual("%s" % found.get("cn"), new_cn)
self.assertEqual("%s" % found.get("sAMAccountName"),
new_samaccountname)
# changing the surname has no effect to the cn
(result, out, err) = self.runsubcmd("user", "rename", new_samaccountname,
"--surname=%s" % new_surname)
self.assertCmdSuccess(result, out, err)
found = self._find_user(new_samaccountname)
self.assertEqual("%s" % found.get("cn"), new_cn)
# trying to remove cn (throws an error)
(result, out, err) = self.runsubcmd("user", "rename",
new_samaccountname,
"--force-new-cn=")
self.assertCmdFail(result)
self.assertIn('Failed to rename user', err)
self.assertIn("delete protected attribute", err)
# trying to remove the samccountname (throws an error)
(result, out, err) = self.runsubcmd("user", "rename",
new_samaccountname,
"--samaccountname=")
self.assertCmdFail(result)
self.assertIn('Failed to rename user', err)
self.assertIn('delete protected attribute', err)
# reset changes (cn must be the name)
(result, out, err) = self.runsubcmd("user", "rename", new_samaccountname,
"--samaccountname=%(name)s"
% user,
"--force-new-cn=%(name)s" % user)
self.assertCmdSuccess(result, out, err)
def test_rename_standard_cn(self):
"""reset the cn of all users to the standard"""
for user in self.users:
new_cn = "new_cn_of_" + user["name"]
new_givenname = "new_given_name_of_" + user["name"]
new_initials = "A"
new_surname = "new_surname_of_" + user["name"]
# set different cn
(result, out, err) = self.runsubcmd("user", "rename", user["name"],
"--force-new-cn=%s" % new_cn)
self.assertCmdSuccess(result, out, err)
# remove given name, initials and surname
(result, out, err) = self.runsubcmd("user", "rename", user["name"],
"--surname=",
"--initials=",
"--given-name=")
self.assertCmdSuccess(result, out, err)
# reset the CN (no given name, initials or surname --> samaccountname)
(result, out, err) = self.runsubcmd("user", "rename", user["name"],
"--reset-cn")
self.assertCmdSuccess(result, out, err)
self.assertEqual(err, "", "Shouldn't be any error messages")
self.assertIn('successfully', out)
found = self._find_user(user["name"])
self.assertEqual("%s" % found.get("cn"), user["name"])
# set given name, initials and surname and set different cn
(result, out, err) = self.runsubcmd("user", "rename", user["name"],
"--force-new-cn=%s" % new_cn,
"--surname=%s" % new_surname,
"--initials=%s" % new_initials,
"--given-name=%s" % new_givenname)
self.assertCmdSuccess(result, out, err)
# reset the CN (given name, initials or surname are given --> given name)
(result, out, err) = self.runsubcmd("user", "rename", user["name"],
"--reset-cn")
self.assertCmdSuccess(result, out, err)
self.assertEqual(err, "", "Shouldn't be any error messages")
self.assertIn('successfully', out)
found = self._find_user(user["name"])
self.assertEqual("%s" % found.get("cn"),
"%s %s. %s" % (new_givenname, new_initials, new_surname))
# reset changes
(result, out, err) = self.runsubcmd("user", "rename", user["name"],
"--reset-cn",
"--initials=",
"--surname=%(surname)s" % user,
"--given-name=%(given-name)s" % user)
self.assertCmdSuccess(result, out, err)
def test_rename_mailaddress_displayname(self):
for user in self.users:
new_mail = "new_mailaddress_of_" + user["name"]
new_displayname = "new displayname of " + user["name"]
# change mail and displayname
(result, out, err) = self.runsubcmd("user", "rename", user["name"],
"--mail-address=%s"
% new_mail,
"--display-name=%s"
% new_displayname)
self.assertCmdSuccess(result, out, err)
self.assertEqual(err, "", "Shouldn't be any error messages")
self.assertIn('successfully', out)
found = self._find_user(user["name"])
self.assertEqual("%s" % found.get("mail"), new_mail)
self.assertEqual("%s" % found.get("displayName"), new_displayname)
# remove mail and displayname
(result, out, err) = self.runsubcmd("user", "rename", user["name"],
"--mail-address=",
"--display-name=")
self.assertCmdSuccess(result, out, err)
self.assertEqual(err, "", "Shouldn't be any error messages")
self.assertIn('successfully', out)
found = self._find_user(user["name"])
self.assertEqual(found.get("mail"), None)
self.assertEqual(found.get("displayName"), None)
def test_rename_upn(self):
"""rename upn of all users"""
for user in self.users:
found = self._find_user(user["name"])
old_upn = "%s" % found.get("userPrincipalName")
valid_suffix = old_upn.split('@')[1] # samba.example.com
valid_new_upn = "new_%s@%s" % (user["name"], valid_suffix)
invalid_new_upn = "%s@invalid.suffix" + user["name"]
# trying to set invalid upn
(result, out, err) = self.runsubcmd("user", "rename", user["name"],
"--upn=%s"
% invalid_new_upn)
self.assertCmdFail(result)
self.assertIn('is not a valid upn', err)
# set valid upn
(result, out, err) = self.runsubcmd("user", "rename", user["name"],
"--upn=%s"
% valid_new_upn)
self.assertCmdSuccess(result, out, err)
self.assertEqual(err, "", "Shouldn't be any error messages")
self.assertIn('successfully', out)
found = self._find_user(user["name"])
self.assertEqual("%s" % found.get("userPrincipalName"), valid_new_upn)
# trying to remove upn
(result, out, err) = self.runsubcmd("user", "rename", user["name"],
"--upn=%s")
self.assertCmdFail(result)
self.assertIn('is not a valid upn', err)
# reset upn
(result, out, err) = self.runsubcmd("user", "rename", user["name"],
"--upn=%s" % old_upn)
self.assertCmdSuccess(result, out, err)
def test_getpwent(self):
try:
import pwd
except ImportError:
self.skipTest("Skipping getpwent test, no 'pwd' module available")
return
# get the current user's data for the test
uid = os.geteuid()
try:
u = pwd.getpwuid(uid)
except KeyError:
self.skipTest("Skipping getpwent test, current EUID not found in NSS")
return
# samba-tool user create command didn't support users with empty gecos if none is
# specified on the command line and the user hasn't one in the passwd file it
# will fail, so let's add some contents
gecos = u[4]
if (gecos is None or len(gecos) == 0):
gecos = "Foo GECOS"
user = self._randomPosixUser({
"name": u[0],
"uid": u[0],
"uidNumber": u[2],
"gidNumber": u[3],
"gecos": gecos,
"loginShell": u[6],
})
# check if --rfc2307-from-nss sets the same values as we got from pwd.getpwuid()
(result, out, err) = self.runsubcmd("user", "create", user["name"], user["password"],
"--surname=%s" % user["surname"],
"--given-name=%s" % user["given-name"],
"--job-title=%s" % user["job-title"],
"--department=%s" % user["department"],
"--description=%s" % user["description"],
"--company=%s" % user["company"],
"--gecos=%s" % user["gecos"],
"--rfc2307-from-nss",
"-H", "ldap://%s" % os.environ["DC_SERVER"],
"-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
self.assertCmdSuccess(result, out, err)
self.assertEqual(err, "", "Shouldn't be any error messages")
self.assertIn("User '%s' added successfully" % user["name"], out)
self._check_posix_user(user)
self.runsubcmd("user", "delete", user["name"])
# Check if overriding the attributes from NSS with explicit values works
#
# get a user with all random posix attributes
user = self._randomPosixUser({"name": u[0]})
# create a user with posix attributes from nss but override all of them with the
# random ones just obtained
(result, out, err) = self.runsubcmd("user", "create", user["name"], user["password"],
"--surname=%s" % user["surname"],
"--given-name=%s" % user["given-name"],
"--job-title=%s" % user["job-title"],
"--department=%s" % user["department"],
"--description=%s" % user["description"],
"--company=%s" % user["company"],
"--rfc2307-from-nss",
"--gecos=%s" % user["gecos"],
"--login-shell=%s" % user["loginShell"],
"--uid=%s" % user["uid"],
"--uid-number=%s" % user["uidNumber"],
"--gid-number=%s" % user["gidNumber"],
"-H", "ldap://%s" % os.environ["DC_SERVER"],
"-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
self.assertCmdSuccess(result, out, err)
self.assertEqual(err, "", "Shouldn't be any error messages")
self.assertIn("User '%s' added successfully" % user["name"], out)
self._check_posix_user(user)
self.runsubcmd("user", "delete", user["name"])
# Test: samba-tool user unlock
# This test does not verify that the command unlocks the user, it just
# tests the command itself. The unlock test, which unlocks locked users,
# is located in the 'samba4.ldap.password_lockout' test in
# source4/dsdb/tests/python/password_lockout.py
def test_unlock(self):
# try to unlock a nonexistent user, this should fail
nonexistentusername = "userdoesnotexist"
(result, out, err) = self.runsubcmd(
"user", "unlock", nonexistentusername)
self.assertCmdFail(result, "Ensure that unlock nonexistent user fails")
self.assertIn("Failed to unlock user '%s'" % nonexistentusername, err)
self.assertIn("Unable to find user", err)
# try to unlock with insufficient permissions, this should fail
unprivileged_username = "unprivilegedunlockuser"
unlocktest_username = "usertounlock"
self.runsubcmd("user", "add", unprivileged_username, "Passw0rd")
self.runsubcmd("user", "add", unlocktest_username, "Passw0rd")
(result, out, err) = self.runsubcmd(
"user", "unlock", unlocktest_username,
"-H", "ldap://%s" % os.environ["DC_SERVER"],
"-U%s%%%s" % (unprivileged_username,
"Passw0rd"))
self.assertCmdFail(result, "Fail with LDAP_INSUFFICIENT_ACCESS_RIGHTS")
self.assertIn("Failed to unlock user '%s'" % unlocktest_username, err)
self.assertIn("LDAP error 50 LDAP_INSUFFICIENT_ACCESS_RIGHTS", err)
self.runsubcmd("user", "delete", unprivileged_username)
self.runsubcmd("user", "delete", unlocktest_username)
# run unlock against test users
for user in self.users:
(result, out, err) = self.runsubcmd(
"user", "unlock", user["name"])
self.assertCmdSuccess(result, out, err, "Error running user unlock")
self.assertEqual(err, "", "Shouldn't be any error messages")
def _randomUser(self, base={}):
"""create a user with random attribute values, you can specify base attributes"""
user = {
"name": self.randomName(),
"password": self.random_password(16),
"surname": self.randomName(),
"given-name": self.randomName(),
"job-title": self.randomName(),
"department": self.randomName(),
"company": self.randomName(),
"description": self.randomName(count=100),
"createUserFn": self._create_user,
"checkUserFn": self._check_user,
}
user.update(base)
return user
def _randomPosixUser(self, base={}):
"""create a user with random attribute values and additional RFC2307
attributes, you can specify base attributes"""
user = self._randomUser({})
user.update(base)
posixAttributes = {
"uid": self.randomName(),
"loginShell": self.randomName(),
"gecos": self.randomName(),
"uidNumber": self.randomXid(),
"gidNumber": self.randomXid(),
"createUserFn": self._create_posix_user,
"checkUserFn": self._check_posix_user,
}
user.update(posixAttributes)
user.update(base)
return user
def _randomUnixUser(self, base={}):
"""create a user with random attribute values and additional RFC2307
attributes, you can specify base attributes"""
user = self._randomUser({})
user.update(base)
posixAttributes = {
"uidNumber": self.randomXid(),
"gidNumber": self.randomXid(),
"uid": self.randomName(),
"loginShell": self.randomName(),
"gecos": self.randomName(),
"createUserFn": self._create_unix_user,
"checkUserFn": self._check_unix_user,
}
user.update(posixAttributes)
user.update(base)
return user
def _check_user(self, user):
""" check if a user from SamDB has the same attributes as its template """
found = self._find_user(user["name"])
self.assertEqual("%s" % found.get("name"), "%(given-name)s %(surname)s" % user)
self.assertEqual("%s" % found.get("title"), user["job-title"])
self.assertEqual("%s" % found.get("company"), user["company"])
self.assertEqual("%s" % found.get("description"), user["description"])
self.assertEqual("%s" % found.get("department"), user["department"])
def _check_posix_user(self, user):
""" check if a posix_user from SamDB has the same attributes as its template """
found = self._find_user(user["name"])
self.assertEqual("%s" % found.get("loginShell"), user["loginShell"])
self.assertEqual("%s" % found.get("gecos"), user["gecos"])
self.assertEqual("%s" % found.get("uidNumber"), "%s" % user["uidNumber"])
self.assertEqual("%s" % found.get("gidNumber"), "%s" % user["gidNumber"])
self.assertEqual("%s" % found.get("uid"), user["uid"])
self._check_user(user)
def _check_unix_user(self, user):
""" check if a unix_user from SamDB has the same attributes as its
template """
found = self._find_user(user["name"])
self.assertEqual("%s" % found.get("loginShell"), user["loginShell"])
self.assertEqual("%s" % found.get("gecos"), user["gecos"])
self.assertEqual("%s" % found.get("uidNumber"), "%s" %
user["uidNumber"])
self.assertEqual("%s" % found.get("gidNumber"), "%s" %
user["gidNumber"])
self.assertEqual("%s" % found.get("uid"), user["uid"])
self.assertIn('/home/test/', "%s" % found.get("unixHomeDirectory"))
self._check_user(user)
def _create_user(self, user):
return self.runsubcmd("user", "add", user["name"], user["password"],
"--surname=%s" % user["surname"],
"--given-name=%s" % user["given-name"],
"--job-title=%s" % user["job-title"],
"--department=%s" % user["department"],
"--description=%s" % user["description"],
"--company=%s" % user["company"],
"-H", "ldap://%s" % os.environ["DC_SERVER"],
"-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
def _create_posix_user(self, user):
""" create a new user with RFC2307 attributes """
return self.runsubcmd("user", "create", user["name"], user["password"],
"--surname=%s" % user["surname"],
"--given-name=%s" % user["given-name"],
"--job-title=%s" % user["job-title"],
"--department=%s" % user["department"],
"--description=%s" % user["description"],
"--company=%s" % user["company"],
"--gecos=%s" % user["gecos"],
"--login-shell=%s" % user["loginShell"],
"--uid=%s" % user["uid"],
"--uid-number=%s" % user["uidNumber"],
"--gid-number=%s" % user["gidNumber"],
"-H", "ldap://%s" % os.environ["DC_SERVER"],
"-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
def _create_unix_user(self, user):
""" Add RFC2307 attributes to a user"""
self._create_user(user)
return self.runsubcmd("user", "addunixattrs", user["name"],
"%s" % user["uidNumber"],
"--gid-number=%s" % user["gidNumber"],
"--gecos=%s" % user["gecos"],
"--login-shell=%s" % user["loginShell"],
"--uid=%s" % user["uid"],
"-H", "ldap://%s" % os.environ["DC_SERVER"],
"-U%s%%%s" % (os.environ["DC_USERNAME"],
os.environ["DC_PASSWORD"]))
def _find_user(self, name):
search_filter = "(&(sAMAccountName=%s)(objectCategory=%s,%s))" % (ldb.binary_encode(name), "CN=Person,CN=Schema,CN=Configuration", self.samdb.domain_dn())
userlist = self.samdb.search(base=self.samdb.domain_dn(),
scope=ldb.SCOPE_SUBTREE,
expression=search_filter)
if userlist:
return userlist[0]
else:
return None