2017-03-14 06:43:06 +03:00
# Unix SMB/CIFS implementation.
# Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
""" Tests for the Auth and AuthZ logging.
"""
import samba . tests
from samba . messaging import Messaging
from samba . dcerpc . messaging import MSG_AUTH_LOG , AUTH_EVENT_NAME
2019-02-26 00:53:43 +03:00
from samba . param import LoadParm
2017-03-14 06:43:06 +03:00
import time
import json
import os
2018-04-30 00:13:58 +03:00
import re
2017-03-14 06:43:06 +03:00
2018-12-17 00:04:42 +03:00
2017-03-14 06:43:06 +03:00
class AuthLogTestBase ( samba . tests . TestCase ) :
2019-02-28 06:55:31 +03:00
@classmethod
2023-05-23 05:57:47 +03:00
def setUpClass ( cls ) :
2023-05-23 05:57:03 +03:00
super ( ) . setUpClass ( )
2019-02-26 00:53:43 +03:00
# connect to the server's messaging bus (we need to explicitly load a
# different smb.conf here, because in all other respects this test
# wants to act as a separate remote client)
server_conf = os . getenv ( ' SERVERCONFFILE ' )
if server_conf :
lp_ctx = LoadParm ( filename_for_non_global_lp = server_conf )
else :
2023-05-24 05:55:35 +03:00
lp_ctx = samba . tests . env_loadparm ( )
2023-05-23 05:57:47 +03:00
cls . msg_ctx = Messaging ( ( 1 , ) , lp_ctx = lp_ctx )
cls . msg_ctx . irpc_add_name ( AUTH_EVENT_NAME )
2017-03-14 06:43:06 +03:00
2019-02-26 01:10:46 +03:00
# Now switch back to using the client-side smb.conf. The tests will
# use the first interface in the client.conf (we need to strip off
# the subnet mask portion)
2019-02-28 06:55:31 +03:00
lp_ctx = samba . tests . env_loadparm ( )
2019-02-26 01:10:46 +03:00
client_ip_and_mask = lp_ctx . get ( ' interfaces ' ) [ 0 ]
client_ip = client_ip_and_mask . split ( ' / ' ) [ 0 ]
# the messaging ctx is the server's view of the world, so our own
# client IP will be the remoteAddress when connections are logged
2023-05-23 05:57:47 +03:00
cls . remoteAddress = client_ip
2019-02-26 01:10:46 +03:00
2018-04-30 01:35:25 +03:00
def messageHandler ( context , msgType , src , message ) :
2017-03-14 06:43:06 +03:00
# This does not look like sub unit output and it
# makes these tests much easier to debug.
2018-03-09 16:38:42 +03:00
print ( message )
2017-03-14 06:43:06 +03:00
jsonMsg = json . loads ( message )
2018-04-30 01:35:25 +03:00
context [ " messages " ] . append ( jsonMsg )
2017-03-14 06:43:06 +03:00
2023-05-23 05:57:47 +03:00
cls . context = { " messages " : [ ] }
cls . msg_handler_and_context = ( messageHandler , cls . context )
cls . msg_ctx . register ( cls . msg_handler_and_context ,
msg_type = MSG_AUTH_LOG )
2017-03-14 06:43:06 +03:00
2023-05-23 05:57:47 +03:00
cls . server = os . environ [ " SERVER " ]
cls . connection = None
2017-03-14 06:43:06 +03:00
2019-02-28 06:55:31 +03:00
@classmethod
2023-05-23 07:45:28 +03:00
def tearDownClass ( cls ) :
2023-05-25 03:22:11 +03:00
cls . msg_ctx . deregister ( cls . msg_handler_and_context ,
msg_type = MSG_AUTH_LOG )
cls . msg_ctx . irpc_remove_name ( AUTH_EVENT_NAME )
2017-03-14 06:43:06 +03:00
2023-05-26 02:50:35 +03:00
super ( ) . tearDownClass ( )
2019-02-28 06:55:31 +03:00
def setUp ( self ) :
super ( AuthLogTestBase , self ) . setUp ( )
2023-05-25 03:16:32 +03:00
type ( self ) . discardMessages ( )
2019-02-28 06:55:31 +03:00
2017-03-14 06:43:06 +03:00
def waitForMessages ( self , isLastExpectedMessage , connection = None ) :
2017-07-09 22:45:16 +03:00
""" Wait for all the expected messages to arrive
The connection is passed through to keep the connection alive
until all the logging messages have been received .
"""
2017-03-14 06:43:06 +03:00
2018-04-30 01:35:25 +03:00
def completed ( messages ) :
2017-03-14 06:43:06 +03:00
for message in messages :
2018-04-30 01:35:25 +03:00
if isRemote ( message ) and isLastExpectedMessage ( message ) :
2017-03-14 06:43:06 +03:00
return True
return False
2018-04-30 01:35:25 +03:00
def isRemote ( message ) :
2019-01-28 05:27:29 +03:00
if self . remoteAddress is None :
return True
2023-05-23 05:20:30 +03:00
supported_types = {
" Authentication " ,
" Authorization " ,
}
message_type = message [ " type " ]
if message_type in supported_types :
remote = message [ message_type ] [ " remoteAddress " ]
2017-03-14 06:43:06 +03:00
else :
return False
try :
addr = remote . split ( " : " )
return addr [ 1 ] == self . remoteAddress
except IndexError :
return False
self . connection = connection
start_time = time . time ( )
2018-04-30 01:35:25 +03:00
while not completed ( self . context [ " messages " ] ) :
2017-03-14 06:43:06 +03:00
self . msg_ctx . loop_once ( 0.1 )
if time . time ( ) - start_time > 1 :
self . connection = None
return [ ]
self . connection = None
2018-11-28 17:15:23 +03:00
return list ( filter ( isRemote , self . context [ " messages " ] ) )
2017-03-20 23:59:45 +03:00
# Discard any previously queued messages.
2019-02-28 06:55:31 +03:00
@classmethod
2023-05-24 01:13:07 +03:00
def discardMessages ( cls ) :
2023-05-25 03:15:07 +03:00
messages = cls . context [ " messages " ]
while True :
messages . clear ( )
# tevent presumably has other tasks to run, so we might need two or
# three loops before a message comes through.
for _ in range ( 5 ) :
cls . msg_ctx . loop_once ( 0.001 )
if not messages :
# No new messages. We’ ve probably got them all.
break
2017-07-09 22:45:16 +03:00
# Remove any NETLOGON authentication messages
# NETLOGON is only performed once per session, so to avoid ordering
# dependencies within the tests it's best to strip out NETLOGON messages.
#
def remove_netlogon_messages ( self , messages ) :
def is_not_netlogon ( msg ) :
if " Authentication " not in msg :
return True
sd = msg [ " Authentication " ] [ " serviceDescription " ]
return sd != " NETLOGON "
return list ( filter ( is_not_netlogon , messages ) )
2018-04-30 00:13:58 +03:00
2023-05-23 07:37:30 +03:00
GUID_RE = re . compile (
" [0-9a-f] {8} -[0-9a-f] {4} -[0-9a-f] {4} -[0-9a-f] {4} -[0-9a-f] {12} " )
2018-04-30 01:35:25 +03:00
2018-04-30 00:13:58 +03:00
#
# Is the supplied GUID string correctly formatted
#
def is_guid ( self , guid ) :
2023-05-23 07:43:10 +03:00
return self . GUID_RE . fullmatch ( guid )