1
0
mirror of https://github.com/samba-team/samba.git synced 2025-02-03 13:47:25 +03:00

auth logging tests: Clean up flake8 warnings

Signed-off-by: Gary Lockyer <gary@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
This commit is contained in:
Gary Lockyer 2018-04-30 10:35:25 +12:00 committed by Andrew Bartlett
parent fdf827553a
commit 8cf4e54696
6 changed files with 350 additions and 395 deletions

File diff suppressed because it is too large Load Diff

View File

@ -19,34 +19,31 @@ from __future__ import print_function
"""Tests for the Auth and AuthZ logging.
"""
from samba import auth
import samba.tests
from samba.messaging import Messaging
from samba.dcerpc.messaging import MSG_AUTH_LOG, AUTH_EVENT_NAME
from samba.dcerpc import srvsvc, dnsserver
import time
import json
import os
import re
from samba import smb
from samba.samdb import SamDB
class AuthLogTestBase(samba.tests.TestCase):
def setUp(self):
super(AuthLogTestBase, self).setUp()
lp_ctx = self.get_loadparm()
self.msg_ctx = Messaging((1,), lp_ctx=lp_ctx);
self.msg_ctx = Messaging((1,), lp_ctx=lp_ctx)
self.msg_ctx.irpc_add_name(AUTH_EVENT_NAME)
def messageHandler( context, msgType, src, message):
def messageHandler(context, msgType, src, message):
# This does not look like sub unit output and it
# makes these tests much easier to debug.
print(message)
jsonMsg = json.loads(message)
context["messages"].append( jsonMsg)
context["messages"].append(jsonMsg)
self.context = { "messages": []}
self.context = {"messages": []}
self.msg_handler_and_context = (messageHandler, self.context)
self.msg_ctx.register(self.msg_handler_and_context,
msg_type=MSG_AUTH_LOG)
@ -62,20 +59,19 @@ class AuthLogTestBase(samba.tests.TestCase):
self.msg_ctx.deregister(self.msg_handler_and_context,
msg_type=MSG_AUTH_LOG)
def waitForMessages(self, isLastExpectedMessage, connection=None):
"""Wait for all the expected messages to arrive
The connection is passed through to keep the connection alive
until all the logging messages have been received.
"""
def completed( messages):
def completed(messages):
for message in messages:
if isRemote( message) and isLastExpectedMessage( message):
if isRemote(message) and isLastExpectedMessage(message):
return True
return False
def isRemote( message):
def isRemote(message):
remote = None
if message["type"] == "Authorization":
remote = message["Authorization"]["remoteAddress"]
@ -93,19 +89,19 @@ class AuthLogTestBase(samba.tests.TestCase):
self.connection = connection
start_time = time.time()
while not completed( self.context["messages"]):
while not completed(self.context["messages"]):
self.msg_ctx.loop_once(0.1)
if time.time() - start_time > 1:
self.connection = None
return []
self.connection = None
return filter( isRemote, self.context["messages"])
return filter(isRemote, self.context["messages"])
# Discard any previously queued messages.
def discardMessages(self):
self.msg_ctx.loop_once(0.001)
while len( self.context["messages"]):
while len(self.context["messages"]):
self.msg_ctx.loop_once(0.001)
self.context["messages"] = []
@ -123,6 +119,7 @@ class AuthLogTestBase(samba.tests.TestCase):
return list(filter(is_not_netlogon, messages))
GUID_RE = "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}"
#
# Is the supplied GUID string correctly formatted
#

View File

@ -18,19 +18,12 @@
"""Tests for the Auth and AuthZ logging.
"""
from samba import auth
import samba.tests
from samba.messaging import Messaging
from samba.dcerpc.messaging import MSG_AUTH_LOG, AUTH_EVENT_NAME
from samba.credentials import DONT_USE_KERBEROS
from samba.dcerpc.dcerpc import AS_SYSTEM_MAGIC_PATH_TOKEN
from samba.dcerpc import samr
import time
import json
import os
from samba import smb
from samba.samdb import SamDB
import samba.tests.auth_log_base
from samba.credentials import Credentials, DONT_USE_KERBEROS, MUST_USE_KERBEROS
class AuthLogTestsNcalrpc(samba.tests.auth_log_base.AuthLogTestBase):
@ -39,25 +32,23 @@ class AuthLogTestsNcalrpc(samba.tests.auth_log_base.AuthLogTestBase):
self.remoteAddress = AS_SYSTEM_MAGIC_PATH_TOKEN
def tearDown(self):
super(AuthLogTestsNcalrpc , self).tearDown()
super(AuthLogTestsNcalrpc, self).tearDown()
def _test_rpc_ncaclrpc(self, authTypes, binding, creds,
protection, checkFunction):
def isLastExpectedMessage( msg):
def isLastExpectedMessage(msg):
return (
msg["type"] == "Authorization" and
msg["Authorization"]["serviceDescription"] == "DCE/RPC" and
msg["Authorization"]["authType"] == authTypes[0] and
msg["Authorization"]["transportProtection"] == protection
)
msg["Authorization"]["transportProtection"] == protection)
if binding:
binding = "[%s]" % binding
samr.samr("ncalrpc:%s" % binding, self.get_loadparm(), creds)
messages = self.waitForMessages( isLastExpectedMessage)
messages = self.waitForMessages(isLastExpectedMessage)
checkFunction(messages, authTypes, protection)
def rpc_ncacn_np_ntlm_check(self, messages, authTypes, protection):
@ -81,9 +72,9 @@ class AuthLogTestsNcalrpc(samba.tests.auth_log_base.AuthLogTestBase):
self.assertEquals("Authentication", msg["type"])
self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
self.assertEquals("DCE/RPC",
msg["Authentication"]["serviceDescription"])
self.assertEquals(authTypes[2], msg["Authentication"]["authDescription"])
msg["Authentication"]["serviceDescription"])
self.assertEquals(authTypes[2],
msg["Authentication"]["authDescription"])
def test_ncalrpc_ntlm_dns_sign(self):

View File

@ -38,6 +38,7 @@ from samba.dsdb import UF_WORKSTATION_TRUST_ACCOUNT, UF_PASSWD_NOTREQD
from samba.dcerpc.misc import SEC_CHAN_WKSTA
from samba.dcerpc.netlogon import NETLOGON_NEG_STRONG_KEYS
class AuthLogTestsNetLogonBadCreds(samba.tests.auth_log_base.AuthLogTestBase):
def setUp(self):

View File

@ -19,23 +19,20 @@ from __future__ import print_function
"""Tests for the Auth and AuthZ logging of password changes.
"""
from samba import auth
import samba.tests
from samba.messaging import Messaging
from samba.samdb import SamDB
from samba.auth import system_session
import json
import os
import samba.tests.auth_log_base
from samba.tests import delete_force
from samba.net import Net
from samba import ntstatus
import samba
from subprocess import call
from ldb import LdbError
USER_NAME = "authlogtestuser"
USER_PASS = samba.generate_random_password(32,32)
USER_PASS = samba.generate_random_password(32, 32)
class AuthLogPassChangeTests(samba.tests.auth_log_base.AuthLogTestBase):
@ -56,9 +53,6 @@ class AuthLogPassChangeTests(samba.tests.auth_log_base.AuthLogTestBase):
base_dn = self.ldb.domain_dn()
print("base_dn %s" % base_dn)
# Gets back the configuration basedn
configuration_dn = self.ldb.get_config_basedn().get_linearized()
# Get the old "dSHeuristics" if it was set
dsheuristics = self.ldb.get_dsheuristics()
@ -82,10 +76,10 @@ class AuthLogPassChangeTests(samba.tests.auth_log_base.AuthLogTestBase):
# (Re)adds the test user USER_NAME with password USER_PASS
delete_force(self.ldb, "cn=" + USER_NAME + ",cn=users," + self.base_dn)
self.ldb.add({
"dn": "cn=" + USER_NAME + ",cn=users," + self.base_dn,
"objectclass": "user",
"sAMAccountName": USER_NAME,
"userPassword": USER_PASS
"dn": "cn=" + USER_NAME + ",cn=users," + self.base_dn,
"objectclass": "user",
"sAMAccountName": USER_NAME,
"userPassword": USER_PASS
})
# discard any auth log messages for the password setup
@ -94,18 +88,16 @@ class AuthLogPassChangeTests(samba.tests.auth_log_base.AuthLogTestBase):
def tearDown(self):
super(AuthLogPassChangeTests, self).tearDown()
def test_admin_change_password(self):
def isLastExpectedMessage(msg):
return (msg["type"] == "Authentication" and
msg["Authentication"]["status"]
== "NT_STATUS_OK" and
msg["Authentication"]["serviceDescription"]
== "SAMR Password Change" and
msg["Authentication"]["authDescription"]
== "samr_ChangePasswordUser3")
return ((msg["type"] == "Authentication") and
(msg["Authentication"]["status"] == "NT_STATUS_OK") and
(msg["Authentication"]["serviceDescription"] ==
"SAMR Password Change") and
(msg["Authentication"]["authDescription"] ==
"samr_ChangePasswordUser3"))
creds = self.insta_creds(template = self.get_credentials())
creds = self.insta_creds(template=self.get_credentials())
lp = self.get_loadparm()
net = Net(creds, lp, server=self.server_ip)
@ -115,7 +107,6 @@ class AuthLogPassChangeTests(samba.tests.auth_log_base.AuthLogTestBase):
username=USER_NAME,
oldpassword=USER_PASS)
messages = self.waitForMessages(isLastExpectedMessage)
print("Received %d messages" % len(messages))
self.assertEquals(8,
@ -124,13 +115,13 @@ class AuthLogPassChangeTests(samba.tests.auth_log_base.AuthLogTestBase):
def test_admin_change_password_new_password_fails_restriction(self):
def isLastExpectedMessage(msg):
return (msg["type"] == "Authentication" and
msg["Authentication"]["status"]
== "NT_STATUS_PASSWORD_RESTRICTION" and
msg["Authentication"]["serviceDescription"]
== "SAMR Password Change" and
msg["Authentication"]["authDescription"]
== "samr_ChangePasswordUser3")
return ((msg["type"] == "Authentication") and
(msg["Authentication"]["status"] ==
"NT_STATUS_PASSWORD_RESTRICTION") and
(msg["Authentication"]["serviceDescription"] ==
"SAMR Password Change") and
(msg["Authentication"]["authDescription"] ==
"samr_ChangePasswordUser3"))
creds = self.insta_creds(template=self.get_credentials())
@ -143,7 +134,7 @@ class AuthLogPassChangeTests(samba.tests.auth_log_base.AuthLogTestBase):
net.change_password(newpassword=password.encode('utf-8'),
oldpassword=USER_PASS,
username=USER_NAME)
except Exception as msg:
except Exception:
exception_thrown = True
self.assertEquals(True, exception_thrown,
"Expected exception not thrown")
@ -155,13 +146,13 @@ class AuthLogPassChangeTests(samba.tests.auth_log_base.AuthLogTestBase):
def test_admin_change_password_unknown_user(self):
def isLastExpectedMessage(msg):
return (msg["type"] == "Authentication" and
msg["Authentication"]["status"]
== "NT_STATUS_NO_SUCH_USER" and
msg["Authentication"]["serviceDescription"]
== "SAMR Password Change" and
msg["Authentication"]["authDescription"]
== "samr_ChangePasswordUser3")
return ((msg["type"] == "Authentication") and
(msg["Authentication"]["status"] ==
"NT_STATUS_NO_SUCH_USER") and
(msg["Authentication"]["serviceDescription"] ==
"SAMR Password Change") and
(msg["Authentication"]["authDescription"] ==
"samr_ChangePasswordUser3"))
creds = self.insta_creds(template=self.get_credentials())
@ -174,7 +165,7 @@ class AuthLogPassChangeTests(samba.tests.auth_log_base.AuthLogTestBase):
net.change_password(newpassword=password.encode('utf-8'),
oldpassword=USER_PASS,
username="badUser")
except Exception as msg:
except Exception:
exception_thrown = True
self.assertEquals(True, exception_thrown,
"Expected exception not thrown")
@ -186,13 +177,13 @@ class AuthLogPassChangeTests(samba.tests.auth_log_base.AuthLogTestBase):
def test_admin_change_password_bad_original_password(self):
def isLastExpectedMessage(msg):
return (msg["type"] == "Authentication" and
msg["Authentication"]["status"]
== "NT_STATUS_WRONG_PASSWORD" and
msg["Authentication"]["serviceDescription"]
== "SAMR Password Change" and
msg["Authentication"]["authDescription"]
== "samr_ChangePasswordUser3")
return ((msg["type"] == "Authentication") and
(msg["Authentication"]["status"] ==
"NT_STATUS_WRONG_PASSWORD") and
(msg["Authentication"]["serviceDescription"] ==
"SAMR Password Change") and
(msg["Authentication"]["authDescription"] ==
"samr_ChangePasswordUser3"))
creds = self.insta_creds(template=self.get_credentials())
@ -205,7 +196,7 @@ class AuthLogPassChangeTests(samba.tests.auth_log_base.AuthLogTestBase):
net.change_password(newpassword=password.encode('utf-8'),
oldpassword="badPassword",
username=USER_NAME)
except Exception as msg:
except Exception:
exception_thrown = True
self.assertEquals(True, exception_thrown,
"Expected exception not thrown")
@ -221,19 +212,19 @@ class AuthLogPassChangeTests(samba.tests.auth_log_base.AuthLogTestBase):
# correctly, so we just check it triggers the wrong password path.
def test_rap_change_password(self):
def isLastExpectedMessage(msg):
return (msg["type"] == "Authentication" and
msg["Authentication"]["serviceDescription"]
== "SAMR Password Change" and
msg["Authentication"]["status"]
== "NT_STATUS_WRONG_PASSWORD" and
msg["Authentication"]["authDescription"]
== "OemChangePasswordUser2")
return ((msg["type"] == "Authentication") and
(msg["Authentication"]["serviceDescription"] ==
"SAMR Password Change") and
(msg["Authentication"]["status"] ==
"NT_STATUS_WRONG_PASSWORD") and
(msg["Authentication"]["authDescription"] ==
"OemChangePasswordUser2"))
username = os.environ["USERNAME"]
server = os.environ["SERVER"]
password = os.environ["PASSWORD"]
server_param = "--server=%s" % server
creds = "-U%s%%%s" % (username,password)
creds = "-U%s%%%s" % (username, password)
call(["bin/net", "rap", server_param,
"password", USER_NAME, "notMyPassword", "notGoingToBeMyPassword",
server, creds, "--option=client ipc max protocol=nt1"])
@ -245,23 +236,21 @@ class AuthLogPassChangeTests(samba.tests.auth_log_base.AuthLogTestBase):
def test_ldap_change_password(self):
def isLastExpectedMessage(msg):
return (msg["type"] == "Authentication" and
msg["Authentication"]["status"]
== "NT_STATUS_OK" and
msg["Authentication"]["serviceDescription"]
== "LDAP Password Change" and
msg["Authentication"]["authDescription"]
== "LDAP Modify")
return ((msg["type"] == "Authentication") and
(msg["Authentication"]["status"] == "NT_STATUS_OK") and
(msg["Authentication"]["serviceDescription"] ==
"LDAP Password Change") and
(msg["Authentication"]["authDescription"] ==
"LDAP Modify"))
new_password = samba.generate_random_password(32,32)
new_password = samba.generate_random_password(32, 32)
self.ldb.modify_ldif(
"dn: cn=" + USER_NAME + ",cn=users," + self.base_dn + "\n" +
"changetype: modify\n" +
"delete: userPassword\n" +
"userPassword: " + USER_PASS + "\n" +
"add: userPassword\n" +
"userPassword: " + new_password + "\n"
)
"userPassword: " + new_password + "\n")
messages = self.waitForMessages(isLastExpectedMessage)
print("Received %d messages" % len(messages))
@ -276,11 +265,10 @@ class AuthLogPassChangeTests(samba.tests.auth_log_base.AuthLogTestBase):
def test_ldap_change_password_bad_user(self):
def isLastExpectedMessage(msg):
return (msg["type"] == "Authorization" and
msg["Authorization"]["serviceDescription"]
== "LDAP" and
msg["Authorization"]["serviceDescription"] == "LDAP" and
msg["Authorization"]["authType"] == "krb5")
new_password = samba.generate_random_password(32,32)
new_password = samba.generate_random_password(32, 32)
try:
self.ldb.modify_ldif(
"dn: cn=" + "badUser" + ",cn=users," + self.base_dn + "\n" +
@ -288,8 +276,7 @@ class AuthLogPassChangeTests(samba.tests.auth_log_base.AuthLogTestBase):
"delete: userPassword\n" +
"userPassword: " + USER_PASS + "\n" +
"add: userPassword\n" +
"userPassword: " + new_password + "\n"
)
"userPassword: " + new_password + "\n")
self.fail()
except LdbError as e:
(num, msg) = e.args
@ -303,15 +290,15 @@ class AuthLogPassChangeTests(samba.tests.auth_log_base.AuthLogTestBase):
def test_ldap_change_password_bad_original_password(self):
def isLastExpectedMessage(msg):
return (msg["type"] == "Authentication" and
msg["Authentication"]["status"]
== "NT_STATUS_WRONG_PASSWORD" and
msg["Authentication"]["serviceDescription"]
== "LDAP Password Change" and
msg["Authentication"]["authDescription"]
== "LDAP Modify")
return ((msg["type"] == "Authentication") and
(msg["Authentication"]["status"] ==
"NT_STATUS_WRONG_PASSWORD") and
(msg["Authentication"]["serviceDescription"] ==
"LDAP Password Change") and
(msg["Authentication"]["authDescription"] ==
"LDAP Modify"))
new_password = samba.generate_random_password(32,32)
new_password = samba.generate_random_password(32, 32)
try:
self.ldb.modify_ldif(
"dn: cn=" + USER_NAME + ",cn=users," + self.base_dn + "\n" +
@ -319,8 +306,7 @@ class AuthLogPassChangeTests(samba.tests.auth_log_base.AuthLogTestBase):
"delete: userPassword\n" +
"userPassword: " + "badPassword" + "\n" +
"add: userPassword\n" +
"userPassword: " + new_password + "\n"
)
"userPassword: " + new_password + "\n")
self.fail()
except LdbError as e1:
(num, msg) = e1.args

View File

@ -19,14 +19,8 @@
Tests auth logging tests that exercise SamLogon
"""
from samba import auth
import samba.tests
from samba.messaging import Messaging
from samba.dcerpc.messaging import MSG_AUTH_LOG, AUTH_EVENT_NAME
import time
import json
import os
from samba import smb
from samba.samdb import SamDB
import samba.tests.auth_log_base
from samba.credentials import (
@ -42,6 +36,7 @@ from samba.tests import delete_force
from samba.dsdb import UF_WORKSTATION_TRUST_ACCOUNT, UF_PASSWD_NOTREQD
from samba.dcerpc.misc import SEC_CHAN_WKSTA
class AuthLogTestsSamLogon(samba.tests.auth_log_base.AuthLogTestBase):
def setUp(self):
@ -63,9 +58,8 @@ class AuthLogTestsSamLogon(samba.tests.auth_log_base.AuthLogTestBase):
self.samlogon_dn = ("cn=%s,cn=users,%s" %
(self.netbios_name, self.base_dn))
def tearDown(self):
super(AuthLogTestsSamLogon , self).tearDown()
super(AuthLogTestsSamLogon, self).tearDown()
delete_force(self.ldb, self.samlogon_dn)
def _test_samlogon(self, binding, creds, checkFunction):
@ -119,7 +113,6 @@ class AuthLogTestsSamLogon(samba.tests.auth_log_base.AuthLogTestBase):
eol.AvId = ntlmssp.MsvAvEOL
target_info.pair = [domainname, computername, eol]
target_info_blob = ndr_pack(target_info)
response = creds.get_ntlm_response(flags=CLI_CRED_NTLMv2_AUTH,
@ -144,15 +137,14 @@ class AuthLogTestsSamLogon(samba.tests.auth_log_base.AuthLogTestBase):
validation_level = samba.dcerpc.netlogon.NetlogonValidationSamInfo4
result = netlogon_conn.netr_LogonSamLogonEx(os.environ["SERVER"],
machine_creds.get_workstation(),
logon_level, logon,
validation_level, netr_flags)
result = netlogon_conn.netr_LogonSamLogonEx(
os.environ["SERVER"],
machine_creds.get_workstation(),
logon_level, logon,
validation_level, netr_flags)
(validation, authoritative, netr_flags_out) = result
messages = self.waitForMessages(isLastExpectedMessage, netlogon_conn)
checkFunction(messages)
@ -173,7 +165,6 @@ class AuthLogTestsSamLogon(samba.tests.auth_log_base.AuthLogTestBase):
self.assertEquals("NONE", msg["Authorization"]["transportProtection"])
self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
def test_ncalrpc_samlogon(self):
creds = self.insta_creds(template=self.get_credentials(),