2017-03-14 16:43:06 +13:00
# Unix SMB/CIFS implementation.
# Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
2018-03-09 13:38:42 +00:00
from __future__ import print_function
2017-03-14 16:43:06 +13:00
""" Tests for the Auth and AuthZ logging.
"""
import samba . tests
from samba . dcerpc import srvsvc , dnsserver
import os
from samba import smb
from samba . samdb import SamDB
import samba . tests . auth_log_base
2018-04-30 10:35:25 +12:00
from samba . credentials import DONT_USE_KERBEROS , MUST_USE_KERBEROS
2017-03-14 16:43:06 +13:00
from samba import NTSTATUSError
from subprocess import call
2017-03-23 12:39:25 +13:00
from ldb import LdbError
2017-03-14 16:43:06 +13:00
2018-04-30 10:35:25 +12:00
2017-03-14 16:43:06 +13:00
class AuthLogTests ( samba . tests . auth_log_base . AuthLogTestBase ) :
def setUp ( self ) :
super ( AuthLogTests , self ) . setUp ( )
self . remoteAddress = os . environ [ " CLIENT_IP " ]
def tearDown ( self ) :
super ( AuthLogTests , self ) . tearDown ( )
def _test_rpc_ncacn_np ( self , authTypes , creds , service ,
binding , protection , checkFunction ) :
2017-03-24 10:51:05 +13:00
def isLastExpectedMessage ( msg ) :
return ( msg [ " type " ] == " Authorization " and
( msg [ " Authorization " ] [ " serviceDescription " ] == " DCE/RPC " or
msg [ " Authorization " ] [ " serviceDescription " ] == service ) and
msg [ " Authorization " ] [ " authType " ] == authTypes [ 0 ] and
msg [ " Authorization " ] [ " transportProtection " ] == protection )
2017-03-14 16:43:06 +13:00
if binding :
binding = " [ %s ] " % binding
if service == " dnsserver " :
x = dnsserver . dnsserver ( " ncacn_np: %s %s " % ( self . server , binding ) ,
2018-04-30 10:35:25 +12:00
self . get_loadparm ( ) ,
creds )
2017-03-14 16:43:06 +13:00
elif service == " srvsvc " :
x = srvsvc . srvsvc ( " ncacn_np: %s %s " % ( self . server , binding ) ,
self . get_loadparm ( ) ,
creds )
# The connection is passed to ensure the server
# messaging context stays up until all the messages have been received.
messages = self . waitForMessages ( isLastExpectedMessage , x )
checkFunction ( messages , authTypes , service , binding , protection )
def rpc_ncacn_np_ntlm_check ( self , messages , authTypes , service ,
binding , protection ) :
expected_messages = len ( authTypes )
self . assertEquals ( expected_messages ,
len ( messages ) ,
" Did not receive the expected number of messages " )
2017-03-24 10:51:05 +13:00
# Check the first message it should be an Authentication
2017-03-14 16:43:06 +13:00
msg = messages [ 0 ]
self . assertEquals ( " Authentication " , msg [ " type " ] )
self . assertEquals ( " NT_STATUS_OK " , msg [ " Authentication " ] [ " status " ] )
self . assertEquals ( " SMB " ,
2018-04-30 10:35:25 +12:00
msg [ " Authentication " ] [ " serviceDescription " ] )
self . assertEquals ( authTypes [ 1 ] ,
msg [ " Authentication " ] [ " authDescription " ] )
2017-03-14 16:43:06 +13:00
# Check the second message it should be an Authorization
msg = messages [ 1 ]
self . assertEquals ( " Authorization " , msg [ " type " ] )
self . assertEquals ( " SMB " ,
msg [ " Authorization " ] [ " serviceDescription " ] )
self . assertEquals ( authTypes [ 2 ] , msg [ " Authorization " ] [ " authType " ] )
self . assertEquals ( " SMB " , msg [ " Authorization " ] [ " transportProtection " ] )
2018-04-30 09:13:58 +12:00
self . assertTrue ( self . is_guid ( msg [ " Authorization " ] [ " sessionId " ] ) )
2017-03-14 16:43:06 +13:00
# Check the third message it should be an Authentication
# if we are expecting 4 messages
if expected_messages == 4 :
2017-03-24 10:51:05 +13:00
def checkServiceDescription ( desc ) :
2017-03-14 16:43:06 +13:00
return ( desc == " DCE/RPC " or desc == service )
msg = messages [ 2 ]
self . assertEquals ( " Authentication " , msg [ " type " ] )
self . assertEquals ( " NT_STATUS_OK " , msg [ " Authentication " ] [ " status " ] )
self . assertTrue (
2018-04-30 10:35:25 +12:00
checkServiceDescription (
msg [ " Authentication " ] [ " serviceDescription " ] ) )
2017-03-14 16:43:06 +13:00
2018-04-30 10:35:25 +12:00
self . assertEquals ( authTypes [ 3 ] ,
msg [ " Authentication " ] [ " authDescription " ] )
2017-03-14 16:43:06 +13:00
2018-04-30 10:35:25 +12:00
def rpc_ncacn_np_krb5_check (
self ,
messages ,
authTypes ,
service ,
binding ,
protection ) :
2017-03-14 16:43:06 +13:00
expected_messages = len ( authTypes )
self . assertEquals ( expected_messages ,
len ( messages ) ,
" Did not receive the expected number of messages " )
2017-03-24 10:51:05 +13:00
# Check the first message it should be an Authentication
2017-03-14 16:43:06 +13:00
# This is almost certainly Authentication over UDP, and is probably
# returning message too big,
msg = messages [ 0 ]
self . assertEquals ( " Authentication " , msg [ " type " ] )
self . assertEquals ( " NT_STATUS_OK " , msg [ " Authentication " ] [ " status " ] )
self . assertEquals ( " Kerberos KDC " ,
2018-04-30 10:35:25 +12:00
msg [ " Authentication " ] [ " serviceDescription " ] )
self . assertEquals ( authTypes [ 1 ] ,
msg [ " Authentication " ] [ " authDescription " ] )
2017-03-14 16:43:06 +13:00
# Check the second message it should be an Authentication
# This this the TCP Authentication in response to the message too big
# response to the UDP Authentication
msg = messages [ 1 ]
self . assertEquals ( " Authentication " , msg [ " type " ] )
self . assertEquals ( " NT_STATUS_OK " , msg [ " Authentication " ] [ " status " ] )
self . assertEquals ( " Kerberos KDC " ,
2018-04-30 10:35:25 +12:00
msg [ " Authentication " ] [ " serviceDescription " ] )
self . assertEquals ( authTypes [ 2 ] ,
msg [ " Authentication " ] [ " authDescription " ] )
2017-03-14 16:43:06 +13:00
# Check the third message it should be an Authorization
msg = messages [ 2 ]
self . assertEquals ( " Authorization " , msg [ " type " ] )
serviceDescription = " SMB "
2018-03-09 13:38:42 +00:00
print ( " binding %s " % binding )
2017-03-14 16:43:06 +13:00
if binding == " [smb2] " :
serviceDescription = " SMB2 "
self . assertEquals ( serviceDescription ,
msg [ " Authorization " ] [ " serviceDescription " ] )
self . assertEquals ( authTypes [ 3 ] , msg [ " Authorization " ] [ " authType " ] )
self . assertEquals ( " SMB " , msg [ " Authorization " ] [ " transportProtection " ] )
2018-04-30 09:13:58 +12:00
self . assertTrue ( self . is_guid ( msg [ " Authorization " ] [ " sessionId " ] ) )
2017-03-14 16:43:06 +13:00
def test_rpc_ncacn_np_ntlm_dns_sign ( self ) :
creds = self . insta_creds ( template = self . get_credentials ( ) ,
kerberos_state = DONT_USE_KERBEROS )
self . _test_rpc_ncacn_np ( [ " NTLMSSP " ,
" NTLMSSP " ,
" NTLMSSP " ,
" NTLMSSP " ] ,
creds , " dnsserver " , " sign " , " SIGN " ,
self . rpc_ncacn_np_ntlm_check )
def test_rpc_ncacn_np_ntlm_srv_sign ( self ) :
creds = self . insta_creds ( template = self . get_credentials ( ) ,
kerberos_state = DONT_USE_KERBEROS )
self . _test_rpc_ncacn_np ( [ " NTLMSSP " ,
" NTLMSSP " ,
" NTLMSSP " ,
" NTLMSSP " ] ,
creds , " srvsvc " , " sign " , " SIGN " ,
self . rpc_ncacn_np_ntlm_check )
def test_rpc_ncacn_np_ntlm_dns ( self ) :
creds = self . insta_creds ( template = self . get_credentials ( ) ,
kerberos_state = DONT_USE_KERBEROS )
self . _test_rpc_ncacn_np ( [ " ncacn_np " ,
" NTLMSSP " ,
" NTLMSSP " ] ,
creds , " dnsserver " , " " , " SMB " ,
self . rpc_ncacn_np_ntlm_check )
def test_rpc_ncacn_np_ntlm_srv ( self ) :
creds = self . insta_creds ( template = self . get_credentials ( ) ,
kerberos_state = DONT_USE_KERBEROS )
self . _test_rpc_ncacn_np ( [ " ncacn_np " ,
" NTLMSSP " ,
" NTLMSSP " ] ,
creds , " srvsvc " , " " , " SMB " ,
self . rpc_ncacn_np_ntlm_check )
def test_rpc_ncacn_np_krb_dns_sign ( self ) :
creds = self . insta_creds ( template = self . get_credentials ( ) ,
kerberos_state = MUST_USE_KERBEROS )
self . _test_rpc_ncacn_np ( [ " krb5 " ,
" ENC-TS Pre-authentication " ,
" ENC-TS Pre-authentication " ,
" krb5 " ] ,
2018-04-30 10:35:25 +12:00
creds , " dnsserver " , " sign " , " SIGN " ,
self . rpc_ncacn_np_krb5_check )
2017-03-14 16:43:06 +13:00
def test_rpc_ncacn_np_krb_srv_sign ( self ) :
creds = self . insta_creds ( template = self . get_credentials ( ) ,
kerberos_state = MUST_USE_KERBEROS )
self . _test_rpc_ncacn_np ( [ " krb5 " ,
" ENC-TS Pre-authentication " ,
" ENC-TS Pre-authentication " ,
" krb5 " ] ,
2018-04-30 10:35:25 +12:00
creds , " srvsvc " , " sign " , " SIGN " ,
self . rpc_ncacn_np_krb5_check )
2017-03-14 16:43:06 +13:00
def test_rpc_ncacn_np_krb_dns ( self ) :
creds = self . insta_creds ( template = self . get_credentials ( ) ,
kerberos_state = MUST_USE_KERBEROS )
self . _test_rpc_ncacn_np ( [ " ncacn_np " ,
" ENC-TS Pre-authentication " ,
" ENC-TS Pre-authentication " ,
" krb5 " ] ,
creds , " dnsserver " , " " , " SMB " ,
self . rpc_ncacn_np_krb5_check )
def test_rpc_ncacn_np_krb_dns_smb2 ( self ) :
creds = self . insta_creds ( template = self . get_credentials ( ) ,
kerberos_state = MUST_USE_KERBEROS )
self . _test_rpc_ncacn_np ( [ " ncacn_np " ,
" ENC-TS Pre-authentication " ,
" ENC-TS Pre-authentication " ,
" krb5 " ] ,
creds , " dnsserver " , " smb2 " , " SMB " ,
self . rpc_ncacn_np_krb5_check )
def test_rpc_ncacn_np_krb_srv ( self ) :
creds = self . insta_creds ( template = self . get_credentials ( ) ,
kerberos_state = MUST_USE_KERBEROS )
self . _test_rpc_ncacn_np ( [ " ncacn_np " ,
2018-04-30 10:35:25 +12:00
" ENC-TS Pre-authentication " ,
" ENC-TS Pre-authentication " ,
" krb5 " ] ,
2017-03-14 16:43:06 +13:00
creds , " srvsvc " , " " , " SMB " ,
self . rpc_ncacn_np_krb5_check )
def _test_rpc_ncacn_ip_tcp ( self , authTypes , creds , service ,
binding , protection , checkFunction ) :
2017-03-24 10:51:05 +13:00
def isLastExpectedMessage ( msg ) :
return ( msg [ " type " ] == " Authorization " and
msg [ " Authorization " ] [ " serviceDescription " ] == " DCE/RPC " and
msg [ " Authorization " ] [ " authType " ] == authTypes [ 0 ] and
msg [ " Authorization " ] [ " transportProtection " ] == protection )
2017-03-14 16:43:06 +13:00
if binding :
binding = " [ %s ] " % binding
if service == " dnsserver " :
2018-04-30 10:35:25 +12:00
conn = dnsserver . dnsserver (
" ncacn_ip_tcp: %s %s " % ( self . server , binding ) ,
self . get_loadparm ( ) ,
creds )
2017-03-14 16:43:06 +13:00
elif service == " srvsvc " :
2017-06-13 11:20:58 +12:00
conn = srvsvc . srvsvc ( " ncacn_ip_tcp: %s %s " % ( self . server , binding ) ,
self . get_loadparm ( ) ,
creds )
2017-03-14 16:43:06 +13:00
2017-06-13 11:20:58 +12:00
messages = self . waitForMessages ( isLastExpectedMessage , conn )
2017-03-14 16:43:06 +13:00
checkFunction ( messages , authTypes , service , binding , protection )
def rpc_ncacn_ip_tcp_ntlm_check ( self , messages , authTypes , service ,
binding , protection ) :
expected_messages = len ( authTypes )
self . assertEquals ( expected_messages ,
len ( messages ) ,
" Did not receive the expected number of messages " )
# Check the first message it should be an Authorization
msg = messages [ 0 ]
self . assertEquals ( " Authorization " , msg [ " type " ] )
self . assertEquals ( " DCE/RPC " ,
msg [ " Authorization " ] [ " serviceDescription " ] )
self . assertEquals ( authTypes [ 1 ] , msg [ " Authorization " ] [ " authType " ] )
self . assertEquals ( " NONE " , msg [ " Authorization " ] [ " transportProtection " ] )
2018-04-30 09:13:58 +12:00
self . assertTrue ( self . is_guid ( msg [ " Authorization " ] [ " sessionId " ] ) )
2017-03-14 16:43:06 +13:00
# Check the second message it should be an Authentication
msg = messages [ 1 ]
self . assertEquals ( " Authentication " , msg [ " type " ] )
self . assertEquals ( " NT_STATUS_OK " , msg [ " Authentication " ] [ " status " ] )
self . assertEquals ( " DCE/RPC " ,
2018-04-30 10:35:25 +12:00
msg [ " Authentication " ] [ " serviceDescription " ] )
self . assertEquals ( authTypes [ 2 ] ,
msg [ " Authentication " ] [ " authDescription " ] )
2017-03-14 16:43:06 +13:00
def rpc_ncacn_ip_tcp_krb5_check ( self , messages , authTypes , service ,
binding , protection ) :
expected_messages = len ( authTypes )
self . assertEquals ( expected_messages ,
len ( messages ) ,
" Did not receive the expected number of messages " )
# Check the first message it should be an Authorization
msg = messages [ 0 ]
self . assertEquals ( " Authorization " , msg [ " type " ] )
self . assertEquals ( " DCE/RPC " ,
msg [ " Authorization " ] [ " serviceDescription " ] )
self . assertEquals ( authTypes [ 1 ] , msg [ " Authorization " ] [ " authType " ] )
self . assertEquals ( " NONE " , msg [ " Authorization " ] [ " transportProtection " ] )
2018-04-30 09:13:58 +12:00
self . assertTrue ( self . is_guid ( msg [ " Authorization " ] [ " sessionId " ] ) )
2017-03-14 16:43:06 +13:00
# Check the second message it should be an Authentication
msg = messages [ 1 ]
self . assertEquals ( " Authentication " , msg [ " type " ] )
self . assertEquals ( " NT_STATUS_OK " , msg [ " Authentication " ] [ " status " ] )
self . assertEquals ( " Kerberos KDC " ,
2018-04-30 10:35:25 +12:00
msg [ " Authentication " ] [ " serviceDescription " ] )
self . assertEquals ( authTypes [ 2 ] ,
msg [ " Authentication " ] [ " authDescription " ] )
2017-03-14 16:43:06 +13:00
# Check the third message it should be an Authentication
msg = messages [ 2 ]
self . assertEquals ( " Authentication " , msg [ " type " ] )
self . assertEquals ( " NT_STATUS_OK " , msg [ " Authentication " ] [ " status " ] )
self . assertEquals ( " Kerberos KDC " ,
2018-04-30 10:35:25 +12:00
msg [ " Authentication " ] [ " serviceDescription " ] )
self . assertEquals ( authTypes [ 2 ] ,
msg [ " Authentication " ] [ " authDescription " ] )
2017-03-14 16:43:06 +13:00
def test_rpc_ncacn_ip_tcp_ntlm_dns_sign ( self ) :
creds = self . insta_creds ( template = self . get_credentials ( ) ,
kerberos_state = DONT_USE_KERBEROS )
self . _test_rpc_ncacn_ip_tcp ( [ " NTLMSSP " ,
" ncacn_ip_tcp " ,
" NTLMSSP " ] ,
2018-04-30 10:35:25 +12:00
creds , " dnsserver " , " sign " , " SIGN " ,
self . rpc_ncacn_ip_tcp_ntlm_check )
2017-03-14 16:43:06 +13:00
def test_rpc_ncacn_ip_tcp_krb5_dns_sign ( self ) :
creds = self . insta_creds ( template = self . get_credentials ( ) ,
kerberos_state = MUST_USE_KERBEROS )
self . _test_rpc_ncacn_ip_tcp ( [ " krb5 " ,
" ncacn_ip_tcp " ,
" ENC-TS Pre-authentication " ,
" ENC-TS Pre-authentication " ] ,
2018-04-30 10:35:25 +12:00
creds , " dnsserver " , " sign " , " SIGN " ,
self . rpc_ncacn_ip_tcp_krb5_check )
2017-03-14 16:43:06 +13:00
def test_rpc_ncacn_ip_tcp_ntlm_dns ( self ) :
creds = self . insta_creds ( template = self . get_credentials ( ) ,
kerberos_state = DONT_USE_KERBEROS )
self . _test_rpc_ncacn_ip_tcp ( [ " NTLMSSP " ,
" ncacn_ip_tcp " ,
" NTLMSSP " ] ,
2018-04-30 10:35:25 +12:00
creds , " dnsserver " , " " , " SIGN " ,
self . rpc_ncacn_ip_tcp_ntlm_check )
2017-03-14 16:43:06 +13:00
def test_rpc_ncacn_ip_tcp_krb5_dns ( self ) :
creds = self . insta_creds ( template = self . get_credentials ( ) ,
kerberos_state = MUST_USE_KERBEROS )
self . _test_rpc_ncacn_ip_tcp ( [ " krb5 " ,
" ncacn_ip_tcp " ,
" ENC-TS Pre-authentication " ,
" ENC-TS Pre-authentication " ] ,
2018-04-30 10:35:25 +12:00
creds , " dnsserver " , " " , " SIGN " ,
self . rpc_ncacn_ip_tcp_krb5_check )
2017-03-14 16:43:06 +13:00
def test_rpc_ncacn_ip_tcp_ntlm_dns_connect ( self ) :
creds = self . insta_creds ( template = self . get_credentials ( ) ,
kerberos_state = DONT_USE_KERBEROS )
self . _test_rpc_ncacn_ip_tcp ( [ " NTLMSSP " ,
" ncacn_ip_tcp " ,
" NTLMSSP " ] ,
2018-04-30 10:35:25 +12:00
creds , " dnsserver " , " connect " , " NONE " ,
self . rpc_ncacn_ip_tcp_ntlm_check )
2017-03-14 16:43:06 +13:00
def test_rpc_ncacn_ip_tcp_krb5_dns_connect ( self ) :
creds = self . insta_creds ( template = self . get_credentials ( ) ,
kerberos_state = MUST_USE_KERBEROS )
self . _test_rpc_ncacn_ip_tcp ( [ " krb5 " ,
" ncacn_ip_tcp " ,
" ENC-TS Pre-authentication " ,
" ENC-TS Pre-authentication " ] ,
2018-04-30 10:35:25 +12:00
creds , " dnsserver " , " connect " , " NONE " ,
self . rpc_ncacn_ip_tcp_krb5_check )
2017-03-14 16:43:06 +13:00
def test_rpc_ncacn_ip_tcp_ntlm_dns_seal ( self ) :
creds = self . insta_creds ( template = self . get_credentials ( ) ,
kerberos_state = DONT_USE_KERBEROS )
self . _test_rpc_ncacn_ip_tcp ( [ " NTLMSSP " ,
" ncacn_ip_tcp " ,
" NTLMSSP " ] ,
2018-04-30 10:35:25 +12:00
creds , " dnsserver " , " seal " , " SEAL " ,
self . rpc_ncacn_ip_tcp_ntlm_check )
2017-03-14 16:43:06 +13:00
def test_rpc_ncacn_ip_tcp_krb5_dns_seal ( self ) :
creds = self . insta_creds ( template = self . get_credentials ( ) ,
kerberos_state = MUST_USE_KERBEROS )
self . _test_rpc_ncacn_ip_tcp ( [ " krb5 " ,
" ncacn_ip_tcp " ,
" ENC-TS Pre-authentication " ,
" ENC-TS Pre-authentication " ] ,
2018-04-30 10:35:25 +12:00
creds , " dnsserver " , " seal " , " SEAL " ,
self . rpc_ncacn_ip_tcp_krb5_check )
2017-03-14 16:43:06 +13:00
def test_ldap ( self ) :
2017-03-24 10:51:05 +13:00
def isLastExpectedMessage ( msg ) :
2017-03-14 16:43:06 +13:00
return ( msg [ " type " ] == " Authorization " and
2017-03-24 10:51:05 +13:00
msg [ " Authorization " ] [ " serviceDescription " ] == " LDAP " and
2017-03-14 16:43:06 +13:00
msg [ " Authorization " ] [ " transportProtection " ] == " SIGN " and
msg [ " Authorization " ] [ " authType " ] == " krb5 " )
self . samdb = SamDB ( url = " ldap:// %s " % os . environ [ " SERVER " ] ,
2018-04-30 10:35:25 +12:00
lp = self . get_loadparm ( ) ,
2017-03-14 16:43:06 +13:00
credentials = self . get_credentials ( ) )
2017-03-24 10:51:05 +13:00
messages = self . waitForMessages ( isLastExpectedMessage )
2017-03-14 16:43:06 +13:00
self . assertEquals ( 3 ,
len ( messages ) ,
" Did not receive the expected number of messages " )
2017-03-24 10:51:05 +13:00
# Check the first message it should be an Authentication
2017-03-14 16:43:06 +13:00
msg = messages [ 0 ]
self . assertEquals ( " Authentication " , msg [ " type " ] )
self . assertEquals ( " NT_STATUS_OK " , msg [ " Authentication " ] [ " status " ] )
self . assertEquals ( " Kerberos KDC " ,
2018-04-30 10:35:25 +12:00
msg [ " Authentication " ] [ " serviceDescription " ] )
2017-03-14 16:43:06 +13:00
self . assertEquals ( " ENC-TS Pre-authentication " ,
2018-04-30 10:35:25 +12:00
msg [ " Authentication " ] [ " authDescription " ] )
2017-03-14 16:43:06 +13:00
2018-04-30 10:35:25 +12:00
# Check the second message it should be an Authentication
2017-03-14 16:43:06 +13:00
msg = messages [ 1 ]
self . assertEquals ( " Authentication " , msg [ " type " ] )
self . assertEquals ( " NT_STATUS_OK " , msg [ " Authentication " ] [ " status " ] )
self . assertEquals ( " Kerberos KDC " ,
2018-04-30 10:35:25 +12:00
msg [ " Authentication " ] [ " serviceDescription " ] )
2017-03-14 16:43:06 +13:00
self . assertEquals ( " ENC-TS Pre-authentication " ,
2018-04-30 10:35:25 +12:00
msg [ " Authentication " ] [ " authDescription " ] )
2017-03-14 16:43:06 +13:00
def test_ldap_ntlm ( self ) :
2017-03-24 10:51:05 +13:00
def isLastExpectedMessage ( msg ) :
2017-03-14 16:43:06 +13:00
return ( msg [ " type " ] == " Authorization " and
2017-03-24 10:51:05 +13:00
msg [ " Authorization " ] [ " serviceDescription " ] == " LDAP " and
2017-03-14 16:43:06 +13:00
msg [ " Authorization " ] [ " transportProtection " ] == " SEAL " and
msg [ " Authorization " ] [ " authType " ] == " NTLMSSP " )
self . samdb = SamDB ( url = " ldap:// %s " % os . environ [ " SERVER_IP " ] ,
2018-04-30 10:35:25 +12:00
lp = self . get_loadparm ( ) ,
2017-03-14 16:43:06 +13:00
credentials = self . get_credentials ( ) )
2017-03-24 10:51:05 +13:00
messages = self . waitForMessages ( isLastExpectedMessage )
2017-03-14 16:43:06 +13:00
self . assertEquals ( 2 ,
len ( messages ) ,
" Did not receive the expected number of messages " )
# Check the first message it should be an Authentication
msg = messages [ 0 ]
self . assertEquals ( " Authentication " , msg [ " type " ] )
self . assertEquals ( " NT_STATUS_OK " , msg [ " Authentication " ] [ " status " ] )
self . assertEquals ( " LDAP " ,
2018-04-30 10:35:25 +12:00
msg [ " Authentication " ] [ " serviceDescription " ] )
2017-03-14 16:43:06 +13:00
self . assertEquals ( " NTLMSSP " , msg [ " Authentication " ] [ " authDescription " ] )
def test_ldap_simple_bind ( self ) :
2017-03-24 10:51:05 +13:00
def isLastExpectedMessage ( msg ) :
2017-03-14 16:43:06 +13:00
return ( msg [ " type " ] == " Authorization " and
2017-03-24 10:51:05 +13:00
msg [ " Authorization " ] [ " serviceDescription " ] == " LDAP " and
2017-03-14 16:43:06 +13:00
msg [ " Authorization " ] [ " transportProtection " ] == " TLS " and
msg [ " Authorization " ] [ " authType " ] == " simple bind " )
creds = self . insta_creds ( template = self . get_credentials ( ) )
creds . set_bind_dn ( " %s \\ %s " % ( creds . get_domain ( ) ,
2018-04-30 10:35:25 +12:00
creds . get_username ( ) ) )
2017-03-14 16:43:06 +13:00
self . samdb = SamDB ( url = " ldaps:// %s " % os . environ [ " SERVER " ] ,
2018-04-30 10:35:25 +12:00
lp = self . get_loadparm ( ) ,
2017-03-14 16:43:06 +13:00
credentials = creds )
2017-03-24 10:51:05 +13:00
messages = self . waitForMessages ( isLastExpectedMessage )
2017-03-14 16:43:06 +13:00
self . assertEquals ( 2 ,
len ( messages ) ,
" Did not receive the expected number of messages " )
2017-03-24 10:51:05 +13:00
# Check the first message it should be an Authentication
2017-03-14 16:43:06 +13:00
msg = messages [ 0 ]
self . assertEquals ( " Authentication " , msg [ " type " ] )
self . assertEquals ( " NT_STATUS_OK " , msg [ " Authentication " ] [ " status " ] )
self . assertEquals ( " LDAP " ,
2018-04-30 10:35:25 +12:00
msg [ " Authentication " ] [ " serviceDescription " ] )
2017-03-14 16:43:06 +13:00
self . assertEquals ( " simple bind " ,
2018-04-30 10:35:25 +12:00
msg [ " Authentication " ] [ " authDescription " ] )
2017-03-14 16:43:06 +13:00
2017-03-23 12:39:25 +13:00
def test_ldap_simple_bind_bad_password ( self ) :
2017-03-24 10:51:05 +13:00
def isLastExpectedMessage ( msg ) :
2017-03-23 12:39:25 +13:00
return ( msg [ " type " ] == " Authentication " and
2017-03-24 10:51:05 +13:00
msg [ " Authentication " ] [ " serviceDescription " ] == " LDAP " and
2018-04-30 10:35:25 +12:00
( msg [ " Authentication " ] [ " status " ] ==
" NT_STATUS_WRONG_PASSWORD " ) and
2017-03-23 12:39:25 +13:00
msg [ " Authentication " ] [ " authDescription " ] == " simple bind " )
creds = self . insta_creds ( template = self . get_credentials ( ) )
2017-03-24 10:51:05 +13:00
creds . set_password ( " badPassword " )
2017-03-23 12:39:25 +13:00
creds . set_bind_dn ( " %s \\ %s " % ( creds . get_domain ( ) ,
2018-04-30 10:35:25 +12:00
creds . get_username ( ) ) )
2017-03-23 12:39:25 +13:00
thrown = False
try :
self . samdb = SamDB ( url = " ldaps:// %s " % os . environ [ " SERVER " ] ,
2018-04-30 10:35:25 +12:00
lp = self . get_loadparm ( ) ,
2017-03-23 12:39:25 +13:00
credentials = creds )
except LdbError :
thrown = True
2017-03-24 10:51:05 +13:00
self . assertEquals ( thrown , True )
2017-03-23 12:39:25 +13:00
2017-03-24 10:51:05 +13:00
messages = self . waitForMessages ( isLastExpectedMessage )
2017-03-23 12:39:25 +13:00
self . assertEquals ( 1 ,
len ( messages ) ,
" Did not receive the expected number of messages " )
def test_ldap_simple_bind_bad_user ( self ) :
2017-03-24 10:51:05 +13:00
def isLastExpectedMessage ( msg ) :
2017-03-23 12:39:25 +13:00
return ( msg [ " type " ] == " Authentication " and
2017-03-24 10:51:05 +13:00
msg [ " Authentication " ] [ " serviceDescription " ] == " LDAP " and
2018-04-30 10:35:25 +12:00
( msg [ " Authentication " ] [ " status " ] ==
" NT_STATUS_NO_SUCH_USER " ) and
2017-03-23 12:39:25 +13:00
msg [ " Authentication " ] [ " authDescription " ] == " simple bind " )
creds = self . insta_creds ( template = self . get_credentials ( ) )
creds . set_bind_dn ( " %s \\ %s " % ( creds . get_domain ( ) , " badUser " ) )
thrown = False
try :
self . samdb = SamDB ( url = " ldaps:// %s " % os . environ [ " SERVER " ] ,
2018-04-30 10:35:25 +12:00
lp = self . get_loadparm ( ) ,
2017-03-23 12:39:25 +13:00
credentials = creds )
except LdbError :
thrown = True
2017-03-24 10:51:05 +13:00
self . assertEquals ( thrown , True )
2017-03-23 12:39:25 +13:00
2017-03-24 10:51:05 +13:00
messages = self . waitForMessages ( isLastExpectedMessage )
2017-03-23 12:39:25 +13:00
self . assertEquals ( 1 ,
len ( messages ) ,
" Did not receive the expected number of messages " )
def test_ldap_simple_bind_unparseable_user ( self ) :
2017-03-24 10:51:05 +13:00
def isLastExpectedMessage ( msg ) :
2017-03-23 12:39:25 +13:00
return ( msg [ " type " ] == " Authentication " and
2017-03-24 10:51:05 +13:00
msg [ " Authentication " ] [ " serviceDescription " ] == " LDAP " and
2018-04-30 10:35:25 +12:00
( msg [ " Authentication " ] [ " status " ] ==
" NT_STATUS_NO_SUCH_USER " ) and
2017-03-23 12:39:25 +13:00
msg [ " Authentication " ] [ " authDescription " ] == " simple bind " )
creds = self . insta_creds ( template = self . get_credentials ( ) )
creds . set_bind_dn ( " %s \\ %s " % ( creds . get_domain ( ) , " abdcef " ) )
thrown = False
try :
self . samdb = SamDB ( url = " ldaps:// %s " % os . environ [ " SERVER " ] ,
2018-04-30 10:35:25 +12:00
lp = self . get_loadparm ( ) ,
2017-03-23 12:39:25 +13:00
credentials = creds )
except LdbError :
thrown = True
2017-03-24 10:51:05 +13:00
self . assertEquals ( thrown , True )
2017-03-23 12:39:25 +13:00
2017-03-24 10:51:05 +13:00
messages = self . waitForMessages ( isLastExpectedMessage )
2017-03-23 12:39:25 +13:00
self . assertEquals ( 1 ,
len ( messages ) ,
" Did not receive the expected number of messages " )
2017-03-24 11:02:36 +13:00
#
# Note: as this test does not expect any messages it will
# time out in the call to self.waitForMessages.
# This is expected, but it will slow this test.
def test_ldap_anonymous_access_bind_only ( self ) :
# Should be no logging for anonymous bind
# so receiving any message indicates a failure.
2018-04-30 10:35:25 +12:00
def isLastExpectedMessage ( msg ) :
2017-03-24 11:02:36 +13:00
return True
creds = self . insta_creds ( template = self . get_credentials ( ) )
creds . set_anonymous ( )
self . samdb = SamDB ( url = " ldaps:// %s " % os . environ [ " SERVER " ] ,
2018-04-30 10:35:25 +12:00
lp = self . get_loadparm ( ) ,
2017-03-24 11:02:36 +13:00
credentials = creds )
2018-04-30 10:35:25 +12:00
messages = self . waitForMessages ( isLastExpectedMessage )
2017-03-24 11:02:36 +13:00
self . assertEquals ( 0 ,
len ( messages ) ,
" Did not receive the expected number of messages " )
def test_ldap_anonymous_access ( self ) :
2018-04-30 10:35:25 +12:00
def isLastExpectedMessage ( msg ) :
2017-03-24 11:02:36 +13:00
return ( msg [ " type " ] == " Authorization " and
msg [ " Authorization " ] [ " serviceDescription " ] == " LDAP " and
msg [ " Authorization " ] [ " transportProtection " ] == " TLS " and
msg [ " Authorization " ] [ " account " ] == " ANONYMOUS LOGON " and
msg [ " Authorization " ] [ " authType " ] == " no bind " )
creds = self . insta_creds ( template = self . get_credentials ( ) )
creds . set_anonymous ( )
self . samdb = SamDB ( url = " ldaps:// %s " % os . environ [ " SERVER " ] ,
2018-04-30 10:35:25 +12:00
lp = self . get_loadparm ( ) ,
2017-03-24 11:02:36 +13:00
credentials = creds )
try :
2018-04-30 10:35:25 +12:00
self . samdb . search ( base = self . samdb . domain_dn ( ) )
self . fail ( " Expected an LdbError exception " )
2017-03-24 11:02:36 +13:00
except LdbError :
pass
2018-04-30 10:35:25 +12:00
messages = self . waitForMessages ( isLastExpectedMessage )
2017-03-24 11:02:36 +13:00
self . assertEquals ( 1 ,
len ( messages ) ,
" Did not receive the expected number of messages " )
2018-04-30 10:35:25 +12:00
2017-03-14 16:43:06 +13:00
def test_smb ( self ) :
2017-03-24 10:51:05 +13:00
def isLastExpectedMessage ( msg ) :
2017-03-14 16:43:06 +13:00
return ( msg [ " type " ] == " Authorization " and
2017-03-24 10:51:05 +13:00
msg [ " Authorization " ] [ " serviceDescription " ] == " SMB " and
msg [ " Authorization " ] [ " authType " ] == " krb5 " and
2017-03-14 16:43:06 +13:00
msg [ " Authorization " ] [ " transportProtection " ] == " SMB " )
creds = self . insta_creds ( template = self . get_credentials ( ) )
smb . SMB ( self . server ,
" sysvol " ,
lp = self . get_loadparm ( ) ,
creds = creds )
2017-03-24 10:51:05 +13:00
messages = self . waitForMessages ( isLastExpectedMessage )
2017-03-14 16:43:06 +13:00
self . assertEquals ( 3 ,
len ( messages ) ,
" Did not receive the expected number of messages " )
2017-03-24 10:51:05 +13:00
# Check the first message it should be an Authentication
2017-03-14 16:43:06 +13:00
msg = messages [ 0 ]
self . assertEquals ( " Authentication " , msg [ " type " ] )
self . assertEquals ( " NT_STATUS_OK " , msg [ " Authentication " ] [ " status " ] )
self . assertEquals ( " Kerberos KDC " ,
2018-04-30 10:35:25 +12:00
msg [ " Authentication " ] [ " serviceDescription " ] )
2017-03-14 16:43:06 +13:00
self . assertEquals ( " ENC-TS Pre-authentication " ,
2018-04-30 10:35:25 +12:00
msg [ " Authentication " ] [ " authDescription " ] )
2017-03-14 16:43:06 +13:00
# Check the second message it should be an Authentication
msg = messages [ 1 ]
self . assertEquals ( " Authentication " , msg [ " type " ] )
self . assertEquals ( " NT_STATUS_OK " , msg [ " Authentication " ] [ " status " ] )
self . assertEquals ( " Kerberos KDC " ,
2018-04-30 10:35:25 +12:00
msg [ " Authentication " ] [ " serviceDescription " ] )
2017-03-14 16:43:06 +13:00
self . assertEquals ( " ENC-TS Pre-authentication " ,
2018-04-30 10:35:25 +12:00
msg [ " Authentication " ] [ " authDescription " ] )
2017-03-14 16:43:06 +13:00
def test_smb_bad_password ( self ) :
2017-03-24 10:51:05 +13:00
def isLastExpectedMessage ( msg ) :
2017-03-14 16:43:06 +13:00
return ( msg [ " type " ] == " Authentication " and
2018-04-30 10:35:25 +12:00
( msg [ " Authentication " ] [ " serviceDescription " ] ==
" Kerberos KDC " ) and
( msg [ " Authentication " ] [ " status " ] ==
" NT_STATUS_WRONG_PASSWORD " ) and
( msg [ " Authentication " ] [ " authDescription " ] ==
" ENC-TS Pre-authentication " ) )
2017-03-14 16:43:06 +13:00
creds = self . insta_creds ( template = self . get_credentials ( ) )
creds . set_password ( " badPassword " )
thrown = False
try :
smb . SMB ( self . server ,
" sysvol " ,
lp = self . get_loadparm ( ) ,
creds = creds )
except NTSTATUSError :
thrown = True
2017-03-24 10:51:05 +13:00
self . assertEquals ( thrown , True )
2017-03-14 16:43:06 +13:00
2017-03-24 10:51:05 +13:00
messages = self . waitForMessages ( isLastExpectedMessage )
2017-03-14 16:43:06 +13:00
self . assertEquals ( 1 ,
len ( messages ) ,
" Did not receive the expected number of messages " )
def test_smb_bad_user ( self ) :
2017-03-24 10:51:05 +13:00
def isLastExpectedMessage ( msg ) :
2017-03-14 16:43:06 +13:00
return ( msg [ " type " ] == " Authentication " and
2018-04-30 10:35:25 +12:00
( msg [ " Authentication " ] [ " serviceDescription " ] ==
" Kerberos KDC " ) and
( msg [ " Authentication " ] [ " status " ] ==
" NT_STATUS_NO_SUCH_USER " ) and
( msg [ " Authentication " ] [ " authDescription " ] ==
" ENC-TS Pre-authentication " ) )
2017-03-14 16:43:06 +13:00
creds = self . insta_creds ( template = self . get_credentials ( ) )
creds . set_username ( " badUser " )
thrown = False
try :
smb . SMB ( self . server ,
" sysvol " ,
lp = self . get_loadparm ( ) ,
creds = creds )
except NTSTATUSError :
thrown = True
2017-03-24 10:51:05 +13:00
self . assertEquals ( thrown , True )
2017-03-14 16:43:06 +13:00
2017-03-24 10:51:05 +13:00
messages = self . waitForMessages ( isLastExpectedMessage )
2017-03-14 16:43:06 +13:00
self . assertEquals ( 1 ,
len ( messages ) ,
" Did not receive the expected number of messages " )
2017-06-20 08:26:45 +02:00
def test_smb1_anonymous ( self ) :
2017-03-24 10:51:05 +13:00
def isLastExpectedMessage ( msg ) :
2017-03-14 16:43:06 +13:00
return ( msg [ " type " ] == " Authorization " and
msg [ " Authorization " ] [ " serviceDescription " ] == " SMB " and
2017-03-24 10:51:05 +13:00
msg [ " Authorization " ] [ " authType " ] == " NTLMSSP " and
2017-03-14 16:43:06 +13:00
msg [ " Authorization " ] [ " account " ] == " ANONYMOUS LOGON " and
msg [ " Authorization " ] [ " transportProtection " ] == " SMB " )
server = os . environ [ " SERVER " ]
path = " // %s /IPC$ " % server
auth = " -N "
2017-06-20 08:26:45 +02:00
call ( [ " bin/smbclient " , path , auth , " -mNT1 " , " -c quit " ] )
2017-03-14 16:43:06 +13:00
2017-03-24 10:51:05 +13:00
messages = self . waitForMessages ( isLastExpectedMessage )
2017-03-14 16:43:06 +13:00
self . assertEquals ( 3 ,
len ( messages ) ,
" Did not receive the expected number of messages " )
# Check the first message it should be an Authentication
msg = messages [ 0 ]
self . assertEquals ( " Authentication " , msg [ " type " ] )
self . assertEquals ( " NT_STATUS_NO_SUCH_USER " ,
msg [ " Authentication " ] [ " status " ] )
self . assertEquals ( " SMB " ,
2017-03-24 10:51:05 +13:00
msg [ " Authentication " ] [ " serviceDescription " ] )
2017-03-14 16:43:06 +13:00
self . assertEquals ( " NTLMSSP " ,
2017-03-24 10:51:05 +13:00
msg [ " Authentication " ] [ " authDescription " ] )
2017-03-14 16:43:06 +13:00
self . assertEquals ( " No-Password " ,
2017-03-24 10:51:05 +13:00
msg [ " Authentication " ] [ " passwordType " ] )
2017-03-14 16:43:06 +13:00
# Check the second message it should be an Authentication
msg = messages [ 1 ]
self . assertEquals ( " Authentication " , msg [ " type " ] )
self . assertEquals ( " NT_STATUS_OK " ,
msg [ " Authentication " ] [ " status " ] )
self . assertEquals ( " SMB " ,
2017-03-24 10:51:05 +13:00
msg [ " Authentication " ] [ " serviceDescription " ] )
2017-03-14 16:43:06 +13:00
self . assertEquals ( " NTLMSSP " ,
2017-03-24 10:51:05 +13:00
msg [ " Authentication " ] [ " authDescription " ] )
2017-03-14 16:43:06 +13:00
self . assertEquals ( " No-Password " ,
2017-03-24 10:51:05 +13:00
msg [ " Authentication " ] [ " passwordType " ] )
2017-03-14 16:43:06 +13:00
self . assertEquals ( " ANONYMOUS LOGON " ,
2017-03-24 10:51:05 +13:00
msg [ " Authentication " ] [ " becameAccount " ] )
2017-03-14 16:43:06 +13:00
2017-06-20 08:26:45 +02:00
def test_smb2_anonymous ( self ) :
def isLastExpectedMessage ( msg ) :
return ( msg [ " type " ] == " Authorization " and
msg [ " Authorization " ] [ " serviceDescription " ] == " SMB2 " and
msg [ " Authorization " ] [ " authType " ] == " NTLMSSP " and
msg [ " Authorization " ] [ " account " ] == " ANONYMOUS LOGON " and
msg [ " Authorization " ] [ " transportProtection " ] == " SMB " )
server = os . environ [ " SERVER " ]
path = " // %s /IPC$ " % server
auth = " -N "
call ( [ " bin/smbclient " , path , auth , " -mSMB3 " , " -c quit " ] )
messages = self . waitForMessages ( isLastExpectedMessage )
self . assertEquals ( 3 ,
len ( messages ) ,
" Did not receive the expected number of messages " )
# Check the first message it should be an Authentication
msg = messages [ 0 ]
self . assertEquals ( " Authentication " , msg [ " type " ] )
self . assertEquals ( " NT_STATUS_NO_SUCH_USER " ,
msg [ " Authentication " ] [ " status " ] )
self . assertEquals ( " SMB2 " ,
msg [ " Authentication " ] [ " serviceDescription " ] )
self . assertEquals ( " NTLMSSP " ,
msg [ " Authentication " ] [ " authDescription " ] )
self . assertEquals ( " No-Password " ,
msg [ " Authentication " ] [ " passwordType " ] )
# Check the second message it should be an Authentication
msg = messages [ 1 ]
self . assertEquals ( " Authentication " , msg [ " type " ] )
self . assertEquals ( " NT_STATUS_OK " ,
msg [ " Authentication " ] [ " status " ] )
self . assertEquals ( " SMB2 " ,
msg [ " Authentication " ] [ " serviceDescription " ] )
self . assertEquals ( " NTLMSSP " ,
msg [ " Authentication " ] [ " authDescription " ] )
self . assertEquals ( " No-Password " ,
msg [ " Authentication " ] [ " passwordType " ] )
self . assertEquals ( " ANONYMOUS LOGON " ,
msg [ " Authentication " ] [ " becameAccount " ] )
2017-03-14 16:43:06 +13:00
def test_smb_no_krb_spnego ( self ) :
2017-03-24 10:51:05 +13:00
def isLastExpectedMessage ( msg ) :
2017-03-14 16:43:06 +13:00
return ( msg [ " type " ] == " Authorization " and
msg [ " Authorization " ] [ " serviceDescription " ] == " SMB " and
msg [ " Authorization " ] [ " authType " ] == " NTLMSSP " and
msg [ " Authorization " ] [ " transportProtection " ] == " SMB " )
creds = self . insta_creds ( template = self . get_credentials ( ) ,
kerberos_state = DONT_USE_KERBEROS )
smb . SMB ( self . server ,
" sysvol " ,
lp = self . get_loadparm ( ) ,
creds = creds )
2017-03-24 10:51:05 +13:00
messages = self . waitForMessages ( isLastExpectedMessage )
2017-03-14 16:43:06 +13:00
self . assertEquals ( 2 ,
len ( messages ) ,
" Did not receive the expected number of messages " )
2017-03-24 10:51:05 +13:00
# Check the first message it should be an Authentication
2017-03-14 16:43:06 +13:00
msg = messages [ 0 ]
self . assertEquals ( " Authentication " , msg [ " type " ] )
self . assertEquals ( " NT_STATUS_OK " , msg [ " Authentication " ] [ " status " ] )
self . assertEquals ( " SMB " ,
2017-03-24 10:51:05 +13:00
msg [ " Authentication " ] [ " serviceDescription " ] )
2017-03-14 16:43:06 +13:00
self . assertEquals ( " NTLMSSP " ,
2017-03-24 10:51:05 +13:00
msg [ " Authentication " ] [ " authDescription " ] )
2017-03-14 16:43:06 +13:00
self . assertEquals ( " NTLMv2 " ,
2017-03-24 10:51:05 +13:00
msg [ " Authentication " ] [ " passwordType " ] )
2017-03-14 16:43:06 +13:00
def test_smb_no_krb_spnego_bad_password ( self ) :
2017-03-24 10:51:05 +13:00
def isLastExpectedMessage ( msg ) :
2017-03-14 16:43:06 +13:00
return ( msg [ " type " ] == " Authentication " and
msg [ " Authentication " ] [ " serviceDescription " ] == " SMB " and
msg [ " Authentication " ] [ " authDescription " ] == " NTLMSSP " and
msg [ " Authentication " ] [ " passwordType " ] == " NTLMv2 " and
2018-04-30 10:35:25 +12:00
( msg [ " Authentication " ] [ " status " ] ==
" NT_STATUS_WRONG_PASSWORD " ) )
2017-03-14 16:43:06 +13:00
creds = self . insta_creds ( template = self . get_credentials ( ) ,
kerberos_state = DONT_USE_KERBEROS )
creds . set_password ( " badPassword " )
thrown = False
try :
smb . SMB ( self . server ,
" sysvol " ,
lp = self . get_loadparm ( ) ,
creds = creds )
except NTSTATUSError :
thrown = True
2017-03-24 10:51:05 +13:00
self . assertEquals ( thrown , True )
2017-03-14 16:43:06 +13:00
2017-03-24 10:51:05 +13:00
messages = self . waitForMessages ( isLastExpectedMessage )
2017-03-14 16:43:06 +13:00
self . assertEquals ( 1 ,
len ( messages ) ,
" Did not receive the expected number of messages " )
def test_smb_no_krb_spnego_bad_user ( self ) :
2017-03-24 10:51:05 +13:00
def isLastExpectedMessage ( msg ) :
2017-03-14 16:43:06 +13:00
return ( msg [ " type " ] == " Authentication " and
msg [ " Authentication " ] [ " serviceDescription " ] == " SMB " and
msg [ " Authentication " ] [ " authDescription " ] == " NTLMSSP " and
msg [ " Authentication " ] [ " passwordType " ] == " NTLMv2 " and
2018-04-30 10:35:25 +12:00
( msg [ " Authentication " ] [ " status " ] ==
" NT_STATUS_NO_SUCH_USER " ) )
2017-03-14 16:43:06 +13:00
creds = self . insta_creds ( template = self . get_credentials ( ) ,
kerberos_state = DONT_USE_KERBEROS )
creds . set_username ( " badUser " )
thrown = False
try :
smb . SMB ( self . server ,
" sysvol " ,
lp = self . get_loadparm ( ) ,
creds = creds )
except NTSTATUSError :
thrown = True
2017-03-24 10:51:05 +13:00
self . assertEquals ( thrown , True )
2017-03-14 16:43:06 +13:00
2017-03-24 10:51:05 +13:00
messages = self . waitForMessages ( isLastExpectedMessage )
2017-03-14 16:43:06 +13:00
self . assertEquals ( 1 ,
len ( messages ) ,
" Did not receive the expected number of messages " )
def test_smb_no_krb_no_spnego_no_ntlmv2 ( self ) :
2017-03-24 10:51:05 +13:00
def isLastExpectedMessage ( msg ) :
2017-03-14 16:43:06 +13:00
return ( msg [ " type " ] == " Authorization " and
msg [ " Authorization " ] [ " serviceDescription " ] == " SMB " and
msg [ " Authorization " ] [ " authType " ] == " bare-NTLM " and
msg [ " Authorization " ] [ " transportProtection " ] == " SMB " )
creds = self . insta_creds ( template = self . get_credentials ( ) ,
kerberos_state = DONT_USE_KERBEROS )
smb . SMB ( self . server ,
" sysvol " ,
lp = self . get_loadparm ( ) ,
creds = creds ,
2017-03-24 10:51:05 +13:00
ntlmv2_auth = False ,
use_spnego = False )
2017-03-14 16:43:06 +13:00
2017-03-24 10:51:05 +13:00
messages = self . waitForMessages ( isLastExpectedMessage )
2017-03-14 16:43:06 +13:00
self . assertEquals ( 2 ,
len ( messages ) ,
" Did not receive the expected number of messages " )
2017-03-24 10:51:05 +13:00
# Check the first message it should be an Authentication
2017-03-14 16:43:06 +13:00
msg = messages [ 0 ]
self . assertEquals ( " Authentication " , msg [ " type " ] )
self . assertEquals ( " NT_STATUS_OK " , msg [ " Authentication " ] [ " status " ] )
self . assertEquals ( " SMB " ,
2017-03-24 10:51:05 +13:00
msg [ " Authentication " ] [ " serviceDescription " ] )
2017-03-14 16:43:06 +13:00
self . assertEquals ( " bare-NTLM " ,
2017-03-24 10:51:05 +13:00
msg [ " Authentication " ] [ " authDescription " ] )
2017-03-14 16:43:06 +13:00
self . assertEquals ( " NTLMv1 " ,
2017-03-24 10:51:05 +13:00
msg [ " Authentication " ] [ " passwordType " ] )
2017-03-14 16:43:06 +13:00
def test_smb_no_krb_no_spnego_no_ntlmv2_bad_password ( self ) :
2017-03-24 10:51:05 +13:00
def isLastExpectedMessage ( msg ) :
2017-03-14 16:43:06 +13:00
return ( msg [ " type " ] == " Authentication " and
msg [ " Authentication " ] [ " serviceDescription " ] == " SMB " and
msg [ " Authentication " ] [ " authDescription " ] == " bare-NTLM " and
msg [ " Authentication " ] [ " passwordType " ] == " NTLMv1 " and
2018-04-30 10:35:25 +12:00
( msg [ " Authentication " ] [ " status " ] ==
" NT_STATUS_WRONG_PASSWORD " ) )
2017-03-14 16:43:06 +13:00
creds = self . insta_creds ( template = self . get_credentials ( ) ,
kerberos_state = DONT_USE_KERBEROS )
creds . set_password ( " badPassword " )
thrown = False
try :
smb . SMB ( self . server ,
" sysvol " ,
lp = self . get_loadparm ( ) ,
creds = creds ,
2017-03-24 10:51:05 +13:00
ntlmv2_auth = False ,
use_spnego = False )
2017-03-14 16:43:06 +13:00
except NTSTATUSError :
thrown = True
2017-03-24 10:51:05 +13:00
self . assertEquals ( thrown , True )
2017-03-14 16:43:06 +13:00
2017-03-24 10:51:05 +13:00
messages = self . waitForMessages ( isLastExpectedMessage )
2017-03-14 16:43:06 +13:00
self . assertEquals ( 1 ,
len ( messages ) ,
" Did not receive the expected number of messages " )
def test_smb_no_krb_no_spnego_no_ntlmv2_bad_user ( self ) :
2017-03-24 10:51:05 +13:00
def isLastExpectedMessage ( msg ) :
2017-03-14 16:43:06 +13:00
return ( msg [ " type " ] == " Authentication " and
msg [ " Authentication " ] [ " serviceDescription " ] == " SMB " and
msg [ " Authentication " ] [ " authDescription " ] == " bare-NTLM " and
msg [ " Authentication " ] [ " passwordType " ] == " NTLMv1 " and
2018-04-30 10:35:25 +12:00
( msg [ " Authentication " ] [ " status " ] ==
" NT_STATUS_NO_SUCH_USER " ) )
2017-03-14 16:43:06 +13:00
creds = self . insta_creds ( template = self . get_credentials ( ) ,
kerberos_state = DONT_USE_KERBEROS )
creds . set_username ( " badUser " )
thrown = False
try :
smb . SMB ( self . server ,
" sysvol " ,
lp = self . get_loadparm ( ) ,
creds = creds ,
2017-03-24 10:51:05 +13:00
ntlmv2_auth = False ,
use_spnego = False )
2017-03-14 16:43:06 +13:00
except NTSTATUSError :
thrown = True
2017-03-24 10:51:05 +13:00
self . assertEquals ( thrown , True )
2017-03-14 16:43:06 +13:00
2017-03-24 10:51:05 +13:00
messages = self . waitForMessages ( isLastExpectedMessage )
2017-03-14 16:43:06 +13:00
self . assertEquals ( 1 ,
len ( messages ) ,
" Did not receive the expected number of messages " )
2017-03-24 11:02:36 +13:00
def test_samlogon_interactive ( self ) :
workstation = " AuthLogTests "
2018-04-30 10:35:25 +12:00
def isLastExpectedMessage ( msg ) :
2017-03-24 11:02:36 +13:00
return ( msg [ " type " ] == " Authentication " and
2018-04-30 10:35:25 +12:00
( msg [ " Authentication " ] [ " serviceDescription " ] ==
" SamLogon " ) and
( msg [ " Authentication " ] [ " authDescription " ] ==
" interactive " ) and
2017-03-24 11:02:36 +13:00
msg [ " Authentication " ] [ " status " ] == " NT_STATUS_OK " and
2018-04-30 10:35:25 +12:00
( msg [ " Authentication " ] [ " workstation " ] ==
r " \\ %s " % workstation ) )
2017-03-24 11:02:36 +13:00
server = os . environ [ " SERVER " ]
user = os . environ [ " USERNAME " ]
password = os . environ [ " PASSWORD " ]
samlogon = " samlogon %s %s %s %d " % ( user , password , workstation , 1 )
call ( [ " bin/rpcclient " , " -c " , samlogon , " -U % " , server ] )
2018-04-30 10:35:25 +12:00
messages = self . waitForMessages ( isLastExpectedMessage )
2017-07-10 07:45:16 +12:00
messages = self . remove_netlogon_messages ( messages )
2017-03-24 11:02:36 +13:00
received = len ( messages )
self . assertIs ( True ,
( received == 5 or received == 6 ) ,
" Did not receive the expected number of messages " )
def test_samlogon_interactive_bad_password ( self ) :
workstation = " AuthLogTests "
2018-04-30 10:35:25 +12:00
def isLastExpectedMessage ( msg ) :
2017-03-24 11:02:36 +13:00
return ( msg [ " type " ] == " Authentication " and
2018-04-30 10:35:25 +12:00
( msg [ " Authentication " ] [ " serviceDescription " ] ==
" SamLogon " ) and
( msg [ " Authentication " ] [ " authDescription " ] ==
" interactive " ) and
( msg [ " Authentication " ] [ " status " ] ==
" NT_STATUS_WRONG_PASSWORD " ) and
( msg [ " Authentication " ] [ " workstation " ] ==
r " \\ %s " % workstation ) )
2017-03-24 11:02:36 +13:00
server = os . environ [ " SERVER " ]
user = os . environ [ " USERNAME " ]
password = " badPassword "
samlogon = " samlogon %s %s %s %d " % ( user , password , workstation , 1 )
call ( [ " bin/rpcclient " , " -c " , samlogon , " -U % " , server ] )
2018-04-30 10:35:25 +12:00
messages = self . waitForMessages ( isLastExpectedMessage )
2017-07-10 07:45:16 +12:00
messages = self . remove_netlogon_messages ( messages )
2017-03-24 11:02:36 +13:00
received = len ( messages )
self . assertIs ( True ,
( received == 5 or received == 6 ) ,
" Did not receive the expected number of messages " )
def test_samlogon_interactive_bad_user ( self ) :
workstation = " AuthLogTests "
2018-04-30 10:35:25 +12:00
def isLastExpectedMessage ( msg ) :
2017-03-24 11:02:36 +13:00
return ( msg [ " type " ] == " Authentication " and
2018-04-30 10:35:25 +12:00
( msg [ " Authentication " ] [ " serviceDescription " ] ==
" SamLogon " ) and
( msg [ " Authentication " ] [ " authDescription " ] ==
" interactive " ) and
( msg [ " Authentication " ] [ " status " ] ==
" NT_STATUS_NO_SUCH_USER " ) and
( msg [ " Authentication " ] [ " workstation " ] ==
r " \\ %s " % workstation ) )
2017-03-24 11:02:36 +13:00
server = os . environ [ " SERVER " ]
user = " badUser "
password = os . environ [ " PASSWORD " ]
samlogon = " samlogon %s %s %s %d " % ( user , password , workstation , 1 )
call ( [ " bin/rpcclient " , " -c " , samlogon , " -U % " , server ] )
2018-04-30 10:35:25 +12:00
messages = self . waitForMessages ( isLastExpectedMessage )
2017-07-10 07:45:16 +12:00
messages = self . remove_netlogon_messages ( messages )
2017-03-24 11:02:36 +13:00
received = len ( messages )
self . assertIs ( True ,
( received == 5 or received == 6 ) ,
" Did not receive the expected number of messages " )
def test_samlogon_network ( self ) :
workstation = " AuthLogTests "
2018-04-30 10:35:25 +12:00
def isLastExpectedMessage ( msg ) :
2017-03-24 11:02:36 +13:00
return ( msg [ " type " ] == " Authentication " and
2018-04-30 10:35:25 +12:00
( msg [ " Authentication " ] [ " serviceDescription " ] ==
" SamLogon " ) and
msg [ " Authentication " ] [ " authDescription " ] == " network " and
2017-03-24 11:02:36 +13:00
msg [ " Authentication " ] [ " status " ] == " NT_STATUS_OK " and
2018-04-30 10:35:25 +12:00
( msg [ " Authentication " ] [ " workstation " ] ==
r " \\ %s " % workstation ) )
2017-03-24 11:02:36 +13:00
server = os . environ [ " SERVER " ]
user = os . environ [ " USERNAME " ]
password = os . environ [ " PASSWORD " ]
samlogon = " samlogon %s %s %s %d " % ( user , password , workstation , 2 )
call ( [ " bin/rpcclient " , " -c " , samlogon , " -U % " , server ] )
2018-04-30 10:35:25 +12:00
messages = self . waitForMessages ( isLastExpectedMessage )
2017-07-10 07:45:16 +12:00
messages = self . remove_netlogon_messages ( messages )
2017-03-24 11:02:36 +13:00
received = len ( messages )
self . assertIs ( True ,
( received == 5 or received == 6 ) ,
" Did not receive the expected number of messages " )
def test_samlogon_network_bad_password ( self ) :
workstation = " AuthLogTests "
2018-04-30 10:35:25 +12:00
def isLastExpectedMessage ( msg ) :
2017-03-24 11:02:36 +13:00
return ( msg [ " type " ] == " Authentication " and
2018-04-30 10:35:25 +12:00
( msg [ " Authentication " ] [ " serviceDescription " ] ==
" SamLogon " ) and
msg [ " Authentication " ] [ " authDescription " ] == " network " and
( msg [ " Authentication " ] [ " status " ] ==
" NT_STATUS_WRONG_PASSWORD " ) and
( msg [ " Authentication " ] [ " workstation " ] ==
r " \\ %s " % workstation ) )
2017-03-24 11:02:36 +13:00
server = os . environ [ " SERVER " ]
user = os . environ [ " USERNAME " ]
password = " badPassword "
samlogon = " samlogon %s %s %s %d " % ( user , password , workstation , 2 )
call ( [ " bin/rpcclient " , " -c " , samlogon , " -U % " , server ] )
2018-04-30 10:35:25 +12:00
messages = self . waitForMessages ( isLastExpectedMessage )
2017-07-10 07:45:16 +12:00
messages = self . remove_netlogon_messages ( messages )
2017-03-24 11:02:36 +13:00
received = len ( messages )
self . assertIs ( True ,
( received == 5 or received == 6 ) ,
" Did not receive the expected number of messages " )
def test_samlogon_network_bad_user ( self ) :
workstation = " AuthLogTests "
2018-04-30 10:35:25 +12:00
def isLastExpectedMessage ( msg ) :
return ( ( msg [ " type " ] == " Authentication " ) and
( msg [ " Authentication " ] [ " serviceDescription " ] ==
" SamLogon " ) and
( msg [ " Authentication " ] [ " authDescription " ] == " network " ) and
( msg [ " Authentication " ] [ " status " ] ==
" NT_STATUS_NO_SUCH_USER " ) and
( msg [ " Authentication " ] [ " workstation " ] ==
r " \\ %s " % workstation ) )
2017-03-24 11:02:36 +13:00
server = os . environ [ " SERVER " ]
user = " badUser "
2018-04-30 10:35:25 +12:00
password = os . environ [ " PASSWORD " ]
2017-03-24 11:02:36 +13:00
samlogon = " samlogon %s %s %s %d " % ( user , password , workstation , 2 )
call ( [ " bin/rpcclient " , " -c " , samlogon , " -U % " , server ] )
2018-04-30 10:35:25 +12:00
messages = self . waitForMessages ( isLastExpectedMessage )
2017-07-10 07:45:16 +12:00
messages = self . remove_netlogon_messages ( messages )
2017-03-24 11:02:36 +13:00
received = len ( messages )
self . assertIs ( True ,
( received == 5 or received == 6 ) ,
" Did not receive the expected number of messages " )
def test_samlogon_network_mschap ( self ) :
workstation = " AuthLogTests "
2018-04-30 10:35:25 +12:00
def isLastExpectedMessage ( msg ) :
return ( ( msg [ " type " ] == " Authentication " ) and
( msg [ " Authentication " ] [ " serviceDescription " ] ==
" SamLogon " ) and
( msg [ " Authentication " ] [ " authDescription " ] == " network " ) and
( msg [ " Authentication " ] [ " status " ] == " NT_STATUS_OK " ) and
( msg [ " Authentication " ] [ " passwordType " ] == " MSCHAPv2 " ) and
( msg [ " Authentication " ] [ " workstation " ] ==
r " \\ %s " % workstation ) )
2017-03-24 11:02:36 +13:00
server = os . environ [ " SERVER " ]
user = os . environ [ " USERNAME " ]
password = os . environ [ " PASSWORD " ]
2018-04-30 10:35:25 +12:00
samlogon = " samlogon %s %s %s %d 0x00010000 " % (
user , password , workstation , 2 )
2017-03-24 11:02:36 +13:00
call ( [ " bin/rpcclient " , " -c " , samlogon , " -U % " , server ] )
2018-04-30 10:35:25 +12:00
messages = self . waitForMessages ( isLastExpectedMessage )
2017-07-10 07:45:16 +12:00
messages = self . remove_netlogon_messages ( messages )
2017-03-24 11:02:36 +13:00
received = len ( messages )
self . assertIs ( True ,
( received == 5 or received == 6 ) ,
" Did not receive the expected number of messages " )
def test_samlogon_network_mschap_bad_password ( self ) :
workstation = " AuthLogTests "
2018-04-30 10:35:25 +12:00
def isLastExpectedMessage ( msg ) :
return ( ( msg [ " type " ] == " Authentication " ) and
( msg [ " Authentication " ] [ " serviceDescription " ] ==
" SamLogon " ) and
( msg [ " Authentication " ] [ " authDescription " ] == " network " ) and
( msg [ " Authentication " ] [ " status " ] ==
" NT_STATUS_WRONG_PASSWORD " ) and
( msg [ " Authentication " ] [ " passwordType " ] == " MSCHAPv2 " ) and
( msg [ " Authentication " ] [ " workstation " ] ==
r " \\ %s " % workstation ) )
2017-03-24 11:02:36 +13:00
server = os . environ [ " SERVER " ]
user = os . environ [ " USERNAME " ]
password = " badPassword "
2018-04-30 10:35:25 +12:00
samlogon = " samlogon %s %s %s %d 0x00010000 " % (
user , password , workstation , 2 )
2017-03-24 11:02:36 +13:00
call ( [ " bin/rpcclient " , " -c " , samlogon , " -U % " , server ] )
2018-04-30 10:35:25 +12:00
messages = self . waitForMessages ( isLastExpectedMessage )
2017-07-10 07:45:16 +12:00
messages = self . remove_netlogon_messages ( messages )
2017-03-24 11:02:36 +13:00
received = len ( messages )
self . assertIs ( True ,
( received == 5 or received == 6 ) ,
" Did not receive the expected number of messages " )
def test_samlogon_network_mschap_bad_user ( self ) :
workstation = " AuthLogTests "
2018-04-30 10:35:25 +12:00
def isLastExpectedMessage ( msg ) :
return ( ( msg [ " type " ] == " Authentication " ) and
( msg [ " Authentication " ] [ " serviceDescription " ] ==
" SamLogon " ) and
( msg [ " Authentication " ] [ " authDescription " ] == " network " ) and
( msg [ " Authentication " ] [ " status " ] ==
" NT_STATUS_NO_SUCH_USER " ) and
( msg [ " Authentication " ] [ " passwordType " ] == " MSCHAPv2 " ) and
( msg [ " Authentication " ] [ " workstation " ] ==
r " \\ %s " % workstation ) )
2017-03-24 11:02:36 +13:00
server = os . environ [ " SERVER " ]
user = " badUser "
password = os . environ [ " PASSWORD " ]
2018-04-30 10:35:25 +12:00
samlogon = " samlogon %s %s %s %d 0x00010000 " % (
user , password , workstation , 2 )
2017-03-24 11:02:36 +13:00
call ( [ " bin/rpcclient " , " -c " , samlogon , " -U % " , server ] )
2018-04-30 10:35:25 +12:00
messages = self . waitForMessages ( isLastExpectedMessage )
2017-07-10 07:45:16 +12:00
messages = self . remove_netlogon_messages ( messages )
2017-03-24 11:02:36 +13:00
received = len ( messages )
self . assertIs ( True ,
( received == 5 or received == 6 ) ,
" Did not receive the expected number of messages " )
def test_samlogon_schannel_seal ( self ) :
workstation = " AuthLogTests "
2018-04-30 10:35:25 +12:00
def isLastExpectedMessage ( msg ) :
return ( ( msg [ " type " ] == " Authentication " ) and
( msg [ " Authentication " ] [ " serviceDescription " ] ==
" SamLogon " ) and
( msg [ " Authentication " ] [ " authDescription " ] == " network " ) and
( msg [ " Authentication " ] [ " status " ] == " NT_STATUS_OK " ) and
( msg [ " Authentication " ] [ " workstation " ] ==
r " \\ %s " % workstation ) )
2017-03-24 11:02:36 +13:00
server = os . environ [ " SERVER " ]
user = os . environ [ " USERNAME " ]
password = os . environ [ " PASSWORD " ]
samlogon = " schannel;samlogon %s %s %s " % ( user , password , workstation )
call ( [ " bin/rpcclient " , " -c " , samlogon , " -U % " , server ] )
2018-04-30 10:35:25 +12:00
messages = self . waitForMessages ( isLastExpectedMessage )
2017-07-10 07:45:16 +12:00
messages = self . remove_netlogon_messages ( messages )
2017-03-24 11:02:36 +13:00
received = len ( messages )
self . assertIs ( True ,
( received == 5 or received == 6 ) ,
" Did not receive the expected number of messages " )
# Check the second to last message it should be an Authorization
msg = messages [ - 2 ]
self . assertEquals ( " Authorization " , msg [ " type " ] )
self . assertEquals ( " DCE/RPC " ,
msg [ " Authorization " ] [ " serviceDescription " ] )
self . assertEquals ( " schannel " , msg [ " Authorization " ] [ " authType " ] )
self . assertEquals ( " SEAL " , msg [ " Authorization " ] [ " transportProtection " ] )
2018-04-30 10:35:25 +12:00
self . assertTrue ( self . is_guid ( msg [ " Authorization " ] [ " sessionId " ] ) )
2017-03-24 11:02:36 +13:00
# Signed logons get promoted to sealed, this test ensures that
2018-04-30 10:35:25 +12:00
# this behaviour is not removed accidentally
2017-03-24 11:02:36 +13:00
def test_samlogon_schannel_sign ( self ) :
workstation = " AuthLogTests "
2018-04-30 10:35:25 +12:00
def isLastExpectedMessage ( msg ) :
return ( ( msg [ " type " ] == " Authentication " ) and
( msg [ " Authentication " ] [ " serviceDescription " ] ==
" SamLogon " ) and
( msg [ " Authentication " ] [ " authDescription " ] == " network " ) and
( msg [ " Authentication " ] [ " status " ] == " NT_STATUS_OK " ) and
( msg [ " Authentication " ] [ " workstation " ] ==
r " \\ %s " % workstation ) )
2017-03-24 11:02:36 +13:00
server = os . environ [ " SERVER " ]
user = os . environ [ " USERNAME " ]
password = os . environ [ " PASSWORD " ]
2018-04-30 10:35:25 +12:00
samlogon = " schannelsign;samlogon %s %s %s " % (
user , password , workstation )
2017-03-24 11:02:36 +13:00
call ( [ " bin/rpcclient " , " -c " , samlogon , " -U % " , server ] )
2018-04-30 10:35:25 +12:00
messages = self . waitForMessages ( isLastExpectedMessage )
2017-07-10 07:45:16 +12:00
messages = self . remove_netlogon_messages ( messages )
2017-03-24 11:02:36 +13:00
received = len ( messages )
self . assertIs ( True ,
( received == 5 or received == 6 ) ,
" Did not receive the expected number of messages " )
# Check the second to last message it should be an Authorization
msg = messages [ - 2 ]
self . assertEquals ( " Authorization " , msg [ " type " ] )
self . assertEquals ( " DCE/RPC " ,
msg [ " Authorization " ] [ " serviceDescription " ] )
self . assertEquals ( " schannel " , msg [ " Authorization " ] [ " authType " ] )
self . assertEquals ( " SEAL " , msg [ " Authorization " ] [ " transportProtection " ] )
2018-04-30 09:13:58 +12:00
self . assertTrue ( self . is_guid ( msg [ " Authorization " ] [ " sessionId " ] ) )