1
0
mirror of https://github.com/samba-team/samba.git synced 2025-07-31 20:22:15 +03:00

auth_log: Add tests by listening for JSON messages over the message bus

Signed-off-by: Andrew Bartlett <abartlet@samba.org>
Signed-off-by: Gary Lockyer <gary@catalyst.net.nz>
Pair-programmed-by: Gary Lockyer <gary@catalyst.net.nz>
This commit is contained in:
Andrew Bartlett
2017-03-14 16:43:06 +13:00
parent 41f1da3a1a
commit 3ee82de26d
5 changed files with 1019 additions and 0 deletions

View File

@ -0,0 +1,801 @@
# Unix SMB/CIFS implementation.
# Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
"""Tests for the Auth and AuthZ logging.
"""
from samba import auth
import samba.tests
from samba.messaging import Messaging
from samba.dcerpc.messaging import MSG_AUTH_LOG, AUTH_EVENT_NAME
from samba.dcerpc import srvsvc, dnsserver
import time
import json
import os
from samba import smb
from samba.samdb import SamDB
import samba.tests.auth_log_base
from samba.credentials import Credentials, DONT_USE_KERBEROS, MUST_USE_KERBEROS
from samba import NTSTATUSError
from subprocess import call
class AuthLogTests(samba.tests.auth_log_base.AuthLogTestBase):
def setUp(self):
super(AuthLogTests, self).setUp()
self.remoteAddress = os.environ["CLIENT_IP"]
def tearDown(self):
super(AuthLogTests, self).tearDown()
def _test_rpc_ncacn_np(self, authTypes, creds, service,
binding, protection, checkFunction):
def isLastExpectedMessage( msg):
return (
msg["type"] == "Authorization" and
( msg["Authorization"]["serviceDescription"] == "DCE/RPC" or
msg["Authorization"]["serviceDescription"] == service) and
msg["Authorization"]["authType"] == authTypes[0] and
msg["Authorization"]["transportProtection"] == protection
)
if binding:
binding = "[%s]" % binding
if service == "dnsserver":
x = dnsserver.dnsserver("ncacn_np:%s%s" % (self.server, binding),
self.get_loadparm(),
creds)
elif service == "srvsvc":
x = srvsvc.srvsvc("ncacn_np:%s%s" % (self.server, binding),
self.get_loadparm(),
creds)
# The connection is passed to ensure the server
# messaging context stays up until all the messages have been received.
messages = self.waitForMessages(isLastExpectedMessage, x)
checkFunction(messages, authTypes, service, binding, protection)
def rpc_ncacn_np_ntlm_check(self, messages, authTypes, service,
binding, protection):
expected_messages = len(authTypes)
self.assertEquals(expected_messages,
len(messages),
"Did not receive the expected number of messages")
# Check the first message it should be an Authentication
msg = messages[0]
self.assertEquals("Authentication", msg["type"])
self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
self.assertEquals("SMB",
msg["Authentication"]["serviceDescription"])
self.assertEquals(authTypes[1], msg["Authentication"]["authDescription"])
# Check the second message it should be an Authorization
msg = messages[1]
self.assertEquals("Authorization", msg["type"])
self.assertEquals("SMB",
msg["Authorization"]["serviceDescription"])
self.assertEquals(authTypes[2], msg["Authorization"]["authType"])
self.assertEquals("SMB", msg["Authorization"]["transportProtection"])
# Check the third message it should be an Authentication
# if we are expecting 4 messages
if expected_messages == 4:
def checkServiceDescription( desc):
return (desc == "DCE/RPC" or desc == service)
msg = messages[2]
self.assertEquals("Authentication", msg["type"])
self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
self.assertTrue(
checkServiceDescription( msg["Authentication"]["serviceDescription"]))
self.assertEquals(authTypes[3], msg["Authentication"]["authDescription"])
def rpc_ncacn_np_krb5_check(self, messages, authTypes, service, binding, protection):
expected_messages = len(authTypes)
self.assertEquals(expected_messages,
len(messages),
"Did not receive the expected number of messages")
# Check the first message it should be an Authentication
# This is almost certainly Authentication over UDP, and is probably
# returning message too big,
msg = messages[0]
self.assertEquals("Authentication", msg["type"])
self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
self.assertEquals("Kerberos KDC",
msg["Authentication"]["serviceDescription"])
self.assertEquals(authTypes[1], msg["Authentication"]["authDescription"])
# Check the second message it should be an Authentication
# This this the TCP Authentication in response to the message too big
# response to the UDP Authentication
msg = messages[1]
self.assertEquals("Authentication", msg["type"])
self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
self.assertEquals("Kerberos KDC",
msg["Authentication"]["serviceDescription"])
self.assertEquals(authTypes[2], msg["Authentication"]["authDescription"])
# Check the third message it should be an Authorization
msg = messages[2]
self.assertEquals("Authorization", msg["type"])
serviceDescription = "SMB"
print "binding %s" % binding
if binding == "[smb2]":
serviceDescription = "SMB2"
self.assertEquals(serviceDescription,
msg["Authorization"]["serviceDescription"])
self.assertEquals(authTypes[3], msg["Authorization"]["authType"])
self.assertEquals("SMB", msg["Authorization"]["transportProtection"])
def test_rpc_ncacn_np_ntlm_dns_sign(self):
creds = self.insta_creds(template=self.get_credentials(),
kerberos_state=DONT_USE_KERBEROS)
self._test_rpc_ncacn_np(["NTLMSSP",
"NTLMSSP",
"NTLMSSP",
"NTLMSSP"],
creds, "dnsserver", "sign", "SIGN",
self.rpc_ncacn_np_ntlm_check)
def test_rpc_ncacn_np_ntlm_srv_sign(self):
creds = self.insta_creds(template=self.get_credentials(),
kerberos_state=DONT_USE_KERBEROS)
self._test_rpc_ncacn_np(["NTLMSSP",
"NTLMSSP",
"NTLMSSP",
"NTLMSSP"],
creds, "srvsvc", "sign", "SIGN",
self.rpc_ncacn_np_ntlm_check)
def test_rpc_ncacn_np_ntlm_dns(self):
creds = self.insta_creds(template=self.get_credentials(),
kerberos_state=DONT_USE_KERBEROS)
self._test_rpc_ncacn_np(["ncacn_np",
"NTLMSSP",
"NTLMSSP"],
creds, "dnsserver", "", "SMB",
self.rpc_ncacn_np_ntlm_check)
def test_rpc_ncacn_np_ntlm_srv(self):
creds = self.insta_creds(template=self.get_credentials(),
kerberos_state=DONT_USE_KERBEROS)
self._test_rpc_ncacn_np(["ncacn_np",
"NTLMSSP",
"NTLMSSP"],
creds, "srvsvc", "", "SMB",
self.rpc_ncacn_np_ntlm_check)
def test_rpc_ncacn_np_krb_dns_sign(self):
creds = self.insta_creds(template=self.get_credentials(),
kerberos_state=MUST_USE_KERBEROS)
self._test_rpc_ncacn_np(["krb5",
"ENC-TS Pre-authentication",
"ENC-TS Pre-authentication",
"krb5"],
creds, "dnsserver", "sign", "SIGN",
self.rpc_ncacn_np_krb5_check)
def test_rpc_ncacn_np_krb_srv_sign(self):
creds = self.insta_creds(template=self.get_credentials(),
kerberos_state=MUST_USE_KERBEROS)
self._test_rpc_ncacn_np(["krb5",
"ENC-TS Pre-authentication",
"ENC-TS Pre-authentication",
"krb5"],
creds, "srvsvc", "sign", "SIGN",
self.rpc_ncacn_np_krb5_check)
def test_rpc_ncacn_np_krb_dns(self):
creds = self.insta_creds(template=self.get_credentials(),
kerberos_state=MUST_USE_KERBEROS)
self._test_rpc_ncacn_np(["ncacn_np",
"ENC-TS Pre-authentication",
"ENC-TS Pre-authentication",
"krb5"],
creds, "dnsserver", "", "SMB",
self.rpc_ncacn_np_krb5_check)
def test_rpc_ncacn_np_krb_dns_smb2(self):
creds = self.insta_creds(template=self.get_credentials(),
kerberos_state=MUST_USE_KERBEROS)
self._test_rpc_ncacn_np(["ncacn_np",
"ENC-TS Pre-authentication",
"ENC-TS Pre-authentication",
"krb5"],
creds, "dnsserver", "smb2", "SMB",
self.rpc_ncacn_np_krb5_check)
def test_rpc_ncacn_np_krb_srv(self):
creds = self.insta_creds(template=self.get_credentials(),
kerberos_state=MUST_USE_KERBEROS)
self._test_rpc_ncacn_np(["ncacn_np",
"ENC-TS Pre-authentication",
"ENC-TS Pre-authentication",
"krb5"],
creds, "srvsvc", "", "SMB",
self.rpc_ncacn_np_krb5_check)
def _test_rpc_ncacn_ip_tcp(self, authTypes, creds, service,
binding, protection, checkFunction):
def isLastExpectedMessage( msg):
return (
msg["type"] == "Authorization" and
msg["Authorization"]["serviceDescription"] == "DCE/RPC" and
msg["Authorization"]["authType"] == authTypes[0] and
msg["Authorization"]["transportProtection"] == protection
)
if binding:
binding = "[%s]" % binding
if service == "dnsserver":
dnsserver.dnsserver("ncacn_ip_tcp:%s%s" % (self.server, binding),
self.get_loadparm(),
creds)
elif service == "srvsvc":
srvsvc.srvsvc("ncacn_ip_tcp:%s%s" % (self.server, binding),
self.get_loadparm(),
creds)
messages = self.waitForMessages( isLastExpectedMessage)
checkFunction(messages, authTypes, service, binding, protection)
def rpc_ncacn_ip_tcp_ntlm_check(self, messages, authTypes, service,
binding, protection):
expected_messages = len(authTypes)
self.assertEquals(expected_messages,
len(messages),
"Did not receive the expected number of messages")
# Check the first message it should be an Authorization
msg = messages[0]
self.assertEquals("Authorization", msg["type"])
self.assertEquals("DCE/RPC",
msg["Authorization"]["serviceDescription"])
self.assertEquals(authTypes[1], msg["Authorization"]["authType"])
self.assertEquals("NONE", msg["Authorization"]["transportProtection"])
# Check the second message it should be an Authentication
msg = messages[1]
self.assertEquals("Authentication", msg["type"])
self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
self.assertEquals("DCE/RPC",
msg["Authentication"]["serviceDescription"])
self.assertEquals(authTypes[2], msg["Authentication"]["authDescription"])
def rpc_ncacn_ip_tcp_krb5_check(self, messages, authTypes, service,
binding, protection):
expected_messages = len(authTypes)
self.assertEquals(expected_messages,
len(messages),
"Did not receive the expected number of messages")
# Check the first message it should be an Authorization
msg = messages[0]
self.assertEquals("Authorization", msg["type"])
self.assertEquals("DCE/RPC",
msg["Authorization"]["serviceDescription"])
self.assertEquals(authTypes[1], msg["Authorization"]["authType"])
self.assertEquals("NONE", msg["Authorization"]["transportProtection"])
# Check the second message it should be an Authentication
msg = messages[1]
self.assertEquals("Authentication", msg["type"])
self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
self.assertEquals("Kerberos KDC",
msg["Authentication"]["serviceDescription"])
self.assertEquals(authTypes[2], msg["Authentication"]["authDescription"])
# Check the third message it should be an Authentication
msg = messages[2]
self.assertEquals("Authentication", msg["type"])
self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
self.assertEquals("Kerberos KDC",
msg["Authentication"]["serviceDescription"])
self.assertEquals(authTypes[2], msg["Authentication"]["authDescription"])
def test_rpc_ncacn_ip_tcp_ntlm_dns_sign(self):
creds = self.insta_creds(template=self.get_credentials(),
kerberos_state=DONT_USE_KERBEROS)
self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
"ncacn_ip_tcp",
"NTLMSSP"],
creds, "dnsserver", "sign", "SIGN",
self.rpc_ncacn_ip_tcp_ntlm_check)
def test_rpc_ncacn_ip_tcp_krb5_dns_sign(self):
creds = self.insta_creds(template=self.get_credentials(),
kerberos_state=MUST_USE_KERBEROS)
self._test_rpc_ncacn_ip_tcp(["krb5",
"ncacn_ip_tcp",
"ENC-TS Pre-authentication",
"ENC-TS Pre-authentication"],
creds, "dnsserver", "sign", "SIGN",
self.rpc_ncacn_ip_tcp_krb5_check)
def test_rpc_ncacn_ip_tcp_ntlm_dns(self):
creds = self.insta_creds(template=self.get_credentials(),
kerberos_state=DONT_USE_KERBEROS)
self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
"ncacn_ip_tcp",
"NTLMSSP"],
creds, "dnsserver", "", "SIGN",
self.rpc_ncacn_ip_tcp_ntlm_check)
def test_rpc_ncacn_ip_tcp_krb5_dns(self):
creds = self.insta_creds(template=self.get_credentials(),
kerberos_state=MUST_USE_KERBEROS)
self._test_rpc_ncacn_ip_tcp(["krb5",
"ncacn_ip_tcp",
"ENC-TS Pre-authentication",
"ENC-TS Pre-authentication"],
creds, "dnsserver", "", "SIGN",
self.rpc_ncacn_ip_tcp_krb5_check)
def test_rpc_ncacn_ip_tcp_ntlm_dns_connect(self):
creds = self.insta_creds(template=self.get_credentials(),
kerberos_state=DONT_USE_KERBEROS)
self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
"ncacn_ip_tcp",
"NTLMSSP"],
creds, "dnsserver", "connect", "NONE",
self.rpc_ncacn_ip_tcp_ntlm_check)
def test_rpc_ncacn_ip_tcp_krb5_dns_connect(self):
creds = self.insta_creds(template=self.get_credentials(),
kerberos_state=MUST_USE_KERBEROS)
self._test_rpc_ncacn_ip_tcp(["krb5",
"ncacn_ip_tcp",
"ENC-TS Pre-authentication",
"ENC-TS Pre-authentication"],
creds, "dnsserver", "connect", "NONE",
self.rpc_ncacn_ip_tcp_krb5_check)
def test_rpc_ncacn_ip_tcp_ntlm_dns_seal(self):
creds = self.insta_creds(template=self.get_credentials(),
kerberos_state=DONT_USE_KERBEROS)
self._test_rpc_ncacn_ip_tcp(["NTLMSSP",
"ncacn_ip_tcp",
"NTLMSSP"],
creds, "dnsserver", "seal", "SEAL",
self.rpc_ncacn_ip_tcp_ntlm_check)
def test_rpc_ncacn_ip_tcp_krb5_dns_seal(self):
creds = self.insta_creds(template=self.get_credentials(),
kerberos_state=MUST_USE_KERBEROS)
self._test_rpc_ncacn_ip_tcp(["krb5",
"ncacn_ip_tcp",
"ENC-TS Pre-authentication",
"ENC-TS Pre-authentication"],
creds, "dnsserver", "seal", "SEAL",
self.rpc_ncacn_ip_tcp_krb5_check)
def test_ldap(self):
def isLastExpectedMessage( msg):
return (msg["type"] == "Authorization" and
msg["Authorization"]["serviceDescription"] == "LDAP" and
msg["Authorization"]["transportProtection"] == "SIGN" and
msg["Authorization"]["authType"] == "krb5")
self.samdb = SamDB(url="ldap://%s" % os.environ["SERVER"],
lp = self.get_loadparm(),
credentials=self.get_credentials())
messages = self.waitForMessages( isLastExpectedMessage)
self.assertEquals(3,
len(messages),
"Did not receive the expected number of messages")
# Check the first message it should be an Authentication
msg = messages[0]
self.assertEquals("Authentication", msg["type"])
self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
self.assertEquals("Kerberos KDC",
msg["Authentication"]["serviceDescription"])
self.assertEquals("ENC-TS Pre-authentication",
msg["Authentication"]["authDescription"])
# Check the first message it should be an Authentication
msg = messages[1]
self.assertEquals("Authentication", msg["type"])
self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
self.assertEquals("Kerberos KDC",
msg["Authentication"]["serviceDescription"])
self.assertEquals("ENC-TS Pre-authentication",
msg["Authentication"]["authDescription"])
def test_ldap_ntlm(self):
def isLastExpectedMessage( msg):
return (msg["type"] == "Authorization" and
msg["Authorization"]["serviceDescription"] == "LDAP" and
msg["Authorization"]["transportProtection"] == "SEAL" and
msg["Authorization"]["authType"] == "NTLMSSP")
self.samdb = SamDB(url="ldap://%s" % os.environ["SERVER_IP"],
lp = self.get_loadparm(),
credentials=self.get_credentials())
messages = self.waitForMessages( isLastExpectedMessage)
self.assertEquals(2,
len(messages),
"Did not receive the expected number of messages")
# Check the first message it should be an Authentication
msg = messages[0]
self.assertEquals("Authentication", msg["type"])
self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
self.assertEquals("LDAP",
msg["Authentication"]["serviceDescription"])
self.assertEquals("NTLMSSP", msg["Authentication"]["authDescription"])
def test_ldap_simple_bind(self):
def isLastExpectedMessage( msg):
return (msg["type"] == "Authorization" and
msg["Authorization"]["serviceDescription"] == "LDAP" and
msg["Authorization"]["transportProtection"] == "TLS" and
msg["Authorization"]["authType"] == "simple bind")
creds = self.insta_creds(template=self.get_credentials())
creds.set_bind_dn("%s\\%s" % (creds.get_domain(),
creds.get_username()))
self.samdb = SamDB(url="ldaps://%s" % os.environ["SERVER"],
lp = self.get_loadparm(),
credentials=creds)
messages = self.waitForMessages( isLastExpectedMessage)
self.assertEquals(2,
len(messages),
"Did not receive the expected number of messages")
# Check the first message it should be an Authentication
msg = messages[0]
self.assertEquals("Authentication", msg["type"])
self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
self.assertEquals("LDAP",
msg["Authentication"]["serviceDescription"])
self.assertEquals("simple bind",
msg["Authentication"]["authDescription"])
def test_smb(self):
def isLastExpectedMessage( msg):
return (msg["type"] == "Authorization" and
msg["Authorization"]["serviceDescription"] == "SMB" and
msg["Authorization"]["authType"] == "krb5" and
msg["Authorization"]["transportProtection"] == "SMB")
creds = self.insta_creds(template=self.get_credentials())
smb.SMB(self.server,
"sysvol",
lp=self.get_loadparm(),
creds=creds)
messages = self.waitForMessages( isLastExpectedMessage)
self.assertEquals(3,
len(messages),
"Did not receive the expected number of messages")
# Check the first message it should be an Authentication
msg = messages[0]
self.assertEquals("Authentication", msg["type"])
self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
self.assertEquals("Kerberos KDC",
msg["Authentication"]["serviceDescription"])
self.assertEquals("ENC-TS Pre-authentication",
msg["Authentication"]["authDescription"])
# Check the second message it should be an Authentication
msg = messages[1]
self.assertEquals("Authentication", msg["type"])
self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
self.assertEquals("Kerberos KDC",
msg["Authentication"]["serviceDescription"])
self.assertEquals("ENC-TS Pre-authentication",
msg["Authentication"]["authDescription"])
def test_smb_bad_password(self):
def isLastExpectedMessage( msg):
return (msg["type"] == "Authentication" and
msg["Authentication"]["serviceDescription"]
== "Kerberos KDC" and
msg["Authentication"]["status"]
== "NT_STATUS_WRONG_PASSWORD" and
msg["Authentication"]["authDescription"]
== "ENC-TS Pre-authentication")
creds = self.insta_creds(template=self.get_credentials())
creds.set_password("badPassword")
thrown = False
try:
smb.SMB(self.server,
"sysvol",
lp=self.get_loadparm(),
creds=creds)
except NTSTATUSError:
thrown = True
self.assertEquals( thrown, True)
messages = self.waitForMessages( isLastExpectedMessage)
self.assertEquals(1,
len(messages),
"Did not receive the expected number of messages")
def test_smb_bad_user(self):
def isLastExpectedMessage( msg):
return (msg["type"] == "Authentication" and
msg["Authentication"]["serviceDescription"]
== "Kerberos KDC" and
msg["Authentication"]["status"]
== "NT_STATUS_NO_SUCH_USER" and
msg["Authentication"]["authDescription"]
== "ENC-TS Pre-authentication")
creds = self.insta_creds(template=self.get_credentials())
creds.set_username("badUser")
thrown = False
try:
smb.SMB(self.server,
"sysvol",
lp=self.get_loadparm(),
creds=creds)
except NTSTATUSError:
thrown = True
self.assertEquals( thrown, True)
messages = self.waitForMessages( isLastExpectedMessage)
self.assertEquals(1,
len(messages),
"Did not receive the expected number of messages")
def test_smb_anonymous(self):
def isLastExpectedMessage( msg):
return (msg["type"] == "Authorization" and
msg["Authorization"]["serviceDescription"] == "SMB" and
msg["Authorization"]["authType"] == "NTLMSSP" and
msg["Authorization"]["account"] == "ANONYMOUS LOGON" and
msg["Authorization"]["transportProtection"] == "SMB")
server = os.environ["SERVER"]
path = "//%s/IPC$" % server
auth = "-N"
call(["bin/smbclient", path, auth, "-c quit"])
messages = self.waitForMessages( isLastExpectedMessage)
self.assertEquals(3,
len(messages),
"Did not receive the expected number of messages")
# Check the first message it should be an Authentication
msg = messages[0]
self.assertEquals("Authentication", msg["type"])
self.assertEquals("NT_STATUS_NO_SUCH_USER",
msg["Authentication"]["status"])
self.assertEquals("SMB",
msg["Authentication"]["serviceDescription"])
self.assertEquals("NTLMSSP",
msg["Authentication"]["authDescription"])
self.assertEquals("No-Password",
msg["Authentication"]["passwordType"])
# Check the second message it should be an Authentication
msg = messages[1]
self.assertEquals("Authentication", msg["type"])
self.assertEquals("NT_STATUS_OK",
msg["Authentication"]["status"])
self.assertEquals("SMB",
msg["Authentication"]["serviceDescription"])
self.assertEquals("NTLMSSP",
msg["Authentication"]["authDescription"])
self.assertEquals("No-Password",
msg["Authentication"]["passwordType"])
self.assertEquals("ANONYMOUS LOGON",
msg["Authentication"]["becameAccount"])
def test_smb_no_krb_spnego(self):
def isLastExpectedMessage( msg):
return (msg["type"] == "Authorization" and
msg["Authorization"]["serviceDescription"] == "SMB" and
msg["Authorization"]["authType"] == "NTLMSSP" and
msg["Authorization"]["transportProtection"] == "SMB")
creds = self.insta_creds(template=self.get_credentials(),
kerberos_state=DONT_USE_KERBEROS)
smb.SMB(self.server,
"sysvol",
lp=self.get_loadparm(),
creds=creds)
messages = self.waitForMessages( isLastExpectedMessage)
self.assertEquals(2,
len(messages),
"Did not receive the expected number of messages")
# Check the first message it should be an Authentication
msg = messages[0]
self.assertEquals("Authentication", msg["type"])
self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
self.assertEquals("SMB",
msg["Authentication"]["serviceDescription"])
self.assertEquals("NTLMSSP",
msg["Authentication"]["authDescription"])
self.assertEquals("NTLMv2",
msg["Authentication"]["passwordType"])
def test_smb_no_krb_spnego_bad_password(self):
def isLastExpectedMessage( msg):
return (msg["type"] == "Authentication" and
msg["Authentication"]["serviceDescription"] == "SMB" and
msg["Authentication"]["authDescription"] == "NTLMSSP" and
msg["Authentication"]["passwordType"] == "NTLMv2" and
msg["Authentication"]["status"]
== "NT_STATUS_WRONG_PASSWORD")
creds = self.insta_creds(template=self.get_credentials(),
kerberos_state=DONT_USE_KERBEROS)
creds.set_password("badPassword")
thrown = False
try:
smb.SMB(self.server,
"sysvol",
lp=self.get_loadparm(),
creds=creds)
except NTSTATUSError:
thrown = True
self.assertEquals( thrown, True)
messages = self.waitForMessages( isLastExpectedMessage)
self.assertEquals(1,
len(messages),
"Did not receive the expected number of messages")
def test_smb_no_krb_spnego_bad_user(self):
def isLastExpectedMessage( msg):
return (msg["type"] == "Authentication" and
msg["Authentication"]["serviceDescription"] == "SMB" and
msg["Authentication"]["authDescription"] == "NTLMSSP" and
msg["Authentication"]["passwordType"] == "NTLMv2" and
msg["Authentication"]["status"]
== "NT_STATUS_NO_SUCH_USER")
creds = self.insta_creds(template=self.get_credentials(),
kerberos_state=DONT_USE_KERBEROS)
creds.set_username("badUser")
thrown = False
try:
smb.SMB(self.server,
"sysvol",
lp=self.get_loadparm(),
creds=creds)
except NTSTATUSError:
thrown = True
self.assertEquals( thrown, True)
messages = self.waitForMessages( isLastExpectedMessage)
self.assertEquals(1,
len(messages),
"Did not receive the expected number of messages")
def test_smb_no_krb_no_spnego_no_ntlmv2(self):
def isLastExpectedMessage( msg):
return (msg["type"] == "Authorization" and
msg["Authorization"]["serviceDescription"] == "SMB" and
msg["Authorization"]["authType"] == "bare-NTLM" and
msg["Authorization"]["transportProtection"] == "SMB")
creds = self.insta_creds(template=self.get_credentials(),
kerberos_state=DONT_USE_KERBEROS)
smb.SMB(self.server,
"sysvol",
lp=self.get_loadparm(),
creds=creds,
ntlmv2_auth = False,
use_spnego = False )
messages = self.waitForMessages( isLastExpectedMessage)
self.assertEquals(2,
len(messages),
"Did not receive the expected number of messages")
# Check the first message it should be an Authentication
msg = messages[0]
self.assertEquals("Authentication", msg["type"])
self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
self.assertEquals("SMB",
msg["Authentication"]["serviceDescription"])
self.assertEquals("bare-NTLM",
msg["Authentication"]["authDescription"])
self.assertEquals("NTLMv1",
msg["Authentication"]["passwordType"])
def test_smb_no_krb_no_spnego_no_ntlmv2_bad_password(self):
def isLastExpectedMessage( msg):
return (msg["type"] == "Authentication" and
msg["Authentication"]["serviceDescription"] == "SMB" and
msg["Authentication"]["authDescription"] == "bare-NTLM" and
msg["Authentication"]["passwordType"] == "NTLMv1" and
msg["Authentication"]["status"]
== "NT_STATUS_WRONG_PASSWORD")
creds = self.insta_creds(template=self.get_credentials(),
kerberos_state=DONT_USE_KERBEROS)
creds.set_password("badPassword")
thrown = False
try:
smb.SMB(self.server,
"sysvol",
lp=self.get_loadparm(),
creds=creds,
ntlmv2_auth = False,
use_spnego = False )
except NTSTATUSError:
thrown = True
self.assertEquals( thrown, True)
messages = self.waitForMessages( isLastExpectedMessage)
self.assertEquals(1,
len(messages),
"Did not receive the expected number of messages")
def test_smb_no_krb_no_spnego_no_ntlmv2_bad_user(self):
def isLastExpectedMessage( msg):
return (msg["type"] == "Authentication" and
msg["Authentication"]["serviceDescription"] == "SMB" and
msg["Authentication"]["authDescription"] == "bare-NTLM" and
msg["Authentication"]["passwordType"] == "NTLMv1" and
msg["Authentication"]["status"]
== "NT_STATUS_NO_SUCH_USER")
creds = self.insta_creds(template=self.get_credentials(),
kerberos_state=DONT_USE_KERBEROS)
creds.set_username("badUser")
thrown = False
try:
smb.SMB(self.server,
"sysvol",
lp=self.get_loadparm(),
creds=creds,
ntlmv2_auth = False,
use_spnego = False )
except NTSTATUSError:
thrown = True
self.assertEquals( thrown, True)
messages = self.waitForMessages( isLastExpectedMessage)
self.assertEquals(1,
len(messages),
"Did not receive the expected number of messages")