1
0
mirror of https://github.com/samba-team/samba.git synced 2025-01-12 09:18:10 +03:00

tests: test source4 cmdline/smb.conf log level

The 'log level' line in smb.conf allows messages from different log
classes to be sent to different places, but we have not tested that
this works. Now we do, somewhat.

The test involves running a special binary based on a stripped down
source4/samba/server.c that just starts up, parses the command line
and a given smb.conf, then logs messages from multiple classes and
exits.

Signed-off-by: Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
This commit is contained in:
Douglas Bagnall 2022-05-26 17:19:51 +12:00 committed by Andrew Bartlett
parent 66cabb8fd1
commit c668b5caa9
4 changed files with 579 additions and 0 deletions
lib/util
python/samba/tests
selftest

View File

@ -0,0 +1,197 @@
/*
Unix SMB/CIFS implementation.
A test server that only does logging.
Copyright (C) Andrew Tridgell 1992-2005
Copyright (C) Martin Pool 2002
Copyright (C) Jelmer Vernooij 2002
Copyright (C) James J Myers 2003 <myersjj@samba.org>
Copyright (C) Douglas Bagnall 2022
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "includes.h"
#include "lib/cmdline/cmdline.h"
#define BINARY_NAME "test_s4_logging"
static int log_level = 1;
struct debug_class {
int dclass;
const char *name;
};
struct debug_class classes_table[] = {
{DBGC_ALL, "all"},
{DBGC_TDB, "tdb"},
{DBGC_PRINTDRIVERS, "printdrivers"},
{DBGC_LANMAN, "lanman"},
{DBGC_SMB, "smb"},
{DBGC_RPC_PARSE, "rpc_parse"},
{DBGC_RPC_SRV, "rpc_srv"},
{DBGC_RPC_CLI, "rpc_cli"},
{DBGC_PASSDB, "passdb"},
{DBGC_SAM, "sam"},
{DBGC_AUTH, "auth"},
{DBGC_WINBIND, "winbind"},
{DBGC_VFS, "vfs"},
{DBGC_IDMAP, "idmap"},
{DBGC_QUOTA, "quota"},
{DBGC_ACLS, "acls"},
{DBGC_LOCKING, "locking"},
{DBGC_MSDFS, "msdfs"},
{DBGC_DMAPI, "dmapi"},
{DBGC_REGISTRY, "registry"},
{DBGC_SCAVENGER, "scavenger"},
{DBGC_DNS, "dns"},
{DBGC_LDB, "ldb"},
{DBGC_TEVENT, "tevent"},
{DBGC_AUTH_AUDIT, "auth_audit"},
{DBGC_AUTH_AUDIT_JSON, "auth_json_audit"},
{DBGC_KERBEROS, "kerberos"},
{DBGC_DRS_REPL, "drs_repl"},
{DBGC_SMB2, "smb2"},
{DBGC_SMB2_CREDITS, "smb2_credits"},
{DBGC_DSDB_AUDIT, "dsdb_audit"},
{DBGC_DSDB_AUDIT_JSON, "dsdb_json_audit"},
{DBGC_DSDB_PWD_AUDIT, "dsdb_password_audit"},
{DBGC_DSDB_PWD_AUDIT_JSON, "dsdb_password_json_audit"},
{DBGC_DSDB_TXN_AUDIT, "dsdb_transaction_audit"},
{DBGC_DSDB_TXN_AUDIT_JSON, "dsdb_transaction_json_audit"},
{DBGC_DSDB_GROUP_AUDIT, "dsdb_group_audit"},
{DBGC_DSDB_GROUP_AUDIT_JSON, "dsdb_group_json_audit"},
};
static int log_all_classes(int level)
{
size_t i;
struct debug_class c;
for (i = 0; i < ARRAY_SIZE(classes_table); i++) {
c = classes_table[i];
if (i != c.dclass) {
/*
* we implicitly rely on these values staying in the
* right order.
*/
fprintf(stderr,
"expected '%s' class to have value %zu\n",
c.name, i);
return 1;
}
DEBUGC(c.dclass, level,
("logging for '%s' [%d], at level %d\n",
c.name, c.dclass, level));
/*
* That's it for the tests *here*. The invoker of this
* process will have set up an smb.conf that directs the
* output in particular ways, and will be looking to see that
* happens correctly.
*/
}
return 0;
}
static int init_daemon(TALLOC_CTX *mem_ctx,
int argc,
const char *argv[],
const char **error)
{
poptContext pc;
int opt;
bool ok;
struct poptOption long_options[] = {
POPT_AUTOHELP
{
.longName = "level",
.shortName = 'L',
.argInfo = POPT_ARG_INT,
.arg = &log_level,
.descrip = "log at this level",
.argDescrip = "LEVEL",
},
POPT_COMMON_SAMBA
POPT_COMMON_DAEMON
POPT_COMMON_VERSION
POPT_TABLEEND
};
setproctitle(BINARY_NAME);
ok = samba_cmdline_init(mem_ctx,
SAMBA_CMDLINE_CONFIG_SERVER,
true /* require_smbconf */);
if (!ok) {
*error = "Failed to init cmdline parser!\n";
return EINVAL;
}
pc = samba_popt_get_context(BINARY_NAME,
argc,
argv,
long_options,
0);
if (pc == NULL) {
*error = "Failed to setup popt context!\n";
return ENOTRECOVERABLE;
}
while((opt = poptGetNextOpt(pc)) != -1) {
fprintf(stderr, "\nInvalid option %s: %s\n\n",
poptBadOption(pc, 0), poptStrerror(opt));
poptPrintUsage(pc, stderr, 0);
return 1;
}
poptFreeContext(pc);
return 0;
}
int main(int argc, const char *argv[])
{
TALLOC_CTX *mem_ctx = NULL;
int rc;
const char *error = NULL;
mem_ctx = talloc_init("crazy-logging-test-server.c#main");
if (mem_ctx == NULL) {
exit(ENOMEM);
}
setproctitle_init(argc, discard_const(argv), environ);
rc = init_daemon(mem_ctx, argc, argv, &error);
if (rc != 0) {
fprintf(stderr, "error [%d]: %s\n", rc, error);
exit_daemon(error, rc);
}
rc = log_all_classes(log_level);
if (rc != 0) {
fprintf(stderr, "error in log_all_classes [%d]\n", rc);
exit_daemon("logging error", rc);
}
TALLOC_FREE(mem_ctx);
return rc;
}

View File

@ -367,3 +367,11 @@ else:
deps='cmocka replace samba-util', deps='cmocka replace samba-util',
local_include=False, local_include=False,
for_selftest=True) for_selftest=True)
bld.SAMBA_BINARY('test_s4_logging',
source='tests/test_s4_logging.c',
deps=' '.join(['CMDLINE_S4',
'samba-util',
'popt']),
local_include=False,
for_selftest=True)

View File

@ -0,0 +1,373 @@
# Unix SMB/CIFS implementation.
#
# Copyright (C) Catalyst.Net Ltd. 2022
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import subprocess
import os
from samba.tests import TestCaseInTempDir
from pprint import pprint
HERE = os.path.dirname(__file__)
SERVER = os.path.join(HERE, '../../../../bin/test_s4_logging')
CLASS_LIST = ["all", "tdb", "printdrivers", "lanman", "smb",
"rpc_parse", "rpc_srv", "rpc_cli", "passdb", "sam", "auth",
"winbind", "vfs", "idmap", "quota", "acls", "locking", "msdfs",
"dmapi", "registry", "scavenger", "dns", "ldb", "tevent",
"auth_audit", "auth_json_audit", "kerberos", "drs_repl",
"smb2", "smb2_credits", "dsdb_audit", "dsdb_json_audit",
"dsdb_password_audit", "dsdb_password_json_audit",
"dsdb_transaction_audit", "dsdb_transaction_json_audit",
"dsdb_group_audit", "dsdb_group_json_audit"]
CLASS_CODES = {k: i for i, k in enumerate(CLASS_LIST)}
class S4LoggingTests(TestCaseInTempDir):
def _write_smb_conf(self,
default_level=2,
default_file="default",
mapping=()):
self.smbconf = os.path.join(self.tempdir, "smb.conf")
with open(self.smbconf, "w") as f:
f.write('[global]\n')
if default_file is not None:
dest = os.path.join(self.tempdir,
default_file)
f.write(f" log file = {dest}\n")
f.write(" log level = ")
if default_level:
f.write(f"{default_level}")
for dbg_class, log_level, log_file in mapping:
f.write(' ')
f.write(dbg_class)
if log_level is not None:
f.write(f':{log_level}')
if log_file is not None:
dest = os.path.join(self.tempdir,
log_file)
f.write(f'@{dest}')
f.write('\n')
self.addCleanup(os.unlink, self.smbconf)
def _extract_log_level_line(self, new_level=2):
# extricate the 'log level' line from the smb.conf, returning
# the value, and replacing the log level line with something
# innocuous.
smbconf2 = self.smbconf + 'new'
with open(self.smbconf) as f:
with open(smbconf2, 'w') as f2:
for line in f:
if 'log level' in line:
debug_arg = line.split('=', 1)[1].strip()
if new_level is not None:
f2.write(f' log level = {new_level}\n')
else:
f2.write(line)
os.replace(smbconf2, self.smbconf)
return debug_arg
def _get_expected_strings(self, mapping,
level_filter,
default_file='default',
file_filter=None):
default = os.path.join(self.tempdir, default_file)
expected = {default: []}
# this kind of thing:
# " logging for 'dns' [21], at level 4"
for dbg_class, log_level, log_file in mapping:
if log_file is None:
log_file = default_file
f = os.path.join(self.tempdir, log_file)
expected.setdefault(f, [])
if log_level < level_filter:
continue
if file_filter not in (None, log_file):
continue
s = (f" logging for '{dbg_class}' [{CLASS_CODES[dbg_class]}], "
f"at level {level_filter}")
expected[f].append(s)
return expected
def _run_s4_logger(self, log_level, *extra_args):
cmd = [SERVER,
'-s', self.smbconf,
'-L', str(log_level),
*extra_args]
p = subprocess.run(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
self.assertEqual(p.returncode, 0,
f"'{' '.join(cmd)}' failed ({p.returncode})")
return p.stdout.decode(), p.stderr.decode()
def assert_string_contains(self, string, expected_lines,
filename=None):
expected_lines = set(expected_lines)
string_lines = set(string.split('\n'))
present_lines = string_lines & expected_lines
if present_lines != expected_lines:
if filename:
print(filename)
print("expected %d lines, found %d" %
(len(expected_lines), len(present_lines)))
print("missing lines:")
pprint(expected_lines - present_lines)
raise AssertionError("missing lines")
def assert_file_contains(self, filename, expected_lines):
with open(filename) as f:
string = f.read()
self.assert_string_contains(string, expected_lines, filename)
def assert_n_known_lines_string(self, string, n):
count = string.count("logging for '")
if count != n:
raise AssertionError(
f"string has {count} lines, expected {n}")
def assert_n_known_lines(self, filename, n):
with open(filename) as f:
string = f.read()
count = string.count(" logging for '")
if count != n:
raise AssertionError(
f"{filename} has {count} lines, expected {n}")
def assert_unlink_expected_strings(self, expected_strings):
for k, v in expected_strings.items():
if not os.path.exists(k):
self.fail(f"{k} does not exist")
self.assert_file_contains(k, v)
self.assert_n_known_lines(k, len(v))
os.unlink(k)
def test_each_to_its_own(self):
level = 4
mapping = [(x, level, x) for x in CLASS_LIST]
expected_strings = self._get_expected_strings(mapping, level)
self._write_smb_conf(mapping=mapping)
stdout, stderr = self._run_s4_logger(level)
self.assert_unlink_expected_strings(expected_strings)
def test_all_to_one(self):
level = 4
dest = 'everything'
mapping = [(x, level, dest) for x in CLASS_LIST]
expected_strings = self._get_expected_strings(mapping, level)
self._write_smb_conf(mapping=mapping)
stdout, stderr = self._run_s4_logger(level)
self.assert_unlink_expected_strings(expected_strings)
def test_bifurcate(self):
level = 4
dests = ['even', 'odd']
mapping = [(x, level + 1, dests[i & 1])
for i, x in enumerate(CLASS_LIST)]
expected_strings = self._get_expected_strings(mapping, level)
self._write_smb_conf(mapping=mapping)
stdout, stderr = self._run_s4_logger(level)
self.assert_unlink_expected_strings(expected_strings)
def test_bifurcate_level_out_of_range(self):
# nothing will be logged, because we're logging at a too high
# level.
level = 4
dests = ['even', 'odd']
mapping = [(x, level - 1, dests[i & 1])
for i, x in enumerate(CLASS_LIST)]
expected_strings = self._get_expected_strings(mapping, level)
self._write_smb_conf(mapping=mapping)
stdout, stderr = self._run_s4_logger(level)
self.assert_unlink_expected_strings(expected_strings)
def test_bifurcate_misc_log_level(self):
# We are sending even numbers to default and odd numbers to
# 'odd', at various levels, depending on mod 3. Like this:
#
# log level = 2 all:5 \
# tdb:4@odd \
# printdrivers:3 \
# lanman:5@odd \
# smb:4 \
# rpc_parse:3@odd \
# rpc_srv:5 ...
#
# Therefore, 'default' should get classes that are (0 or 4) % 6
# and 'odd' should get classes that are (1 or 3) % 6.
level = 4
dests = [None, 'odd']
mapping = []
for i, x in enumerate(CLASS_LIST):
parity = i & 1
log_level = level + 1 - (i % 3)
mapping.append((x, log_level, dests[parity]))
expected_strings = self._get_expected_strings(mapping, level)
self._write_smb_conf(mapping=mapping)
stdout, stderr = self._run_s4_logger(level)
self.assert_unlink_expected_strings(expected_strings)
def test_all_different_ways_cmdline_d(self):
level = 4
dests = [None, 'a', 'b', 'c']
mapping = []
seed = 123
for i, x in enumerate(CLASS_LIST):
d = seed & 3
seed = seed * 17 + 1
log_level = seed % 10
seed &= 0xff
mapping.append((x, log_level, dests[d]))
expected_strings = self._get_expected_strings(mapping, level)
self._write_smb_conf(mapping=mapping)
debug_arg = self._extract_log_level_line(26)
stdout, stderr = self._run_s4_logger(level, '-d', debug_arg)
self.assert_unlink_expected_strings(expected_strings)
def test_all_different_ways_cmdline_d_interactive(self):
level = 4
dests = [None, 'a', 'b', 'c']
mapping = []
seed = 1234
for i, x in enumerate(CLASS_LIST):
d = seed & 3
seed = seed * 13 + 1
log_level = seed % 10
seed &= 0xff
mapping.append((x, log_level, dests[d]))
expected_strings = self._get_expected_strings(mapping, level)
self._write_smb_conf(mapping=mapping)
debug_arg = self._extract_log_level_line(None)
stdout, stderr = self._run_s4_logger(level, '-d', debug_arg, '-i')
expected_lines = []
for v in expected_strings.values():
# stderr doesn't end up with leading ' '
expected_lines.extend([x.strip() for x in v])
self.assert_string_contains(stderr, expected_lines)
self.assert_n_known_lines_string(stderr, len(expected_lines))
def test_only_some_level_0(self):
# running the logger with -L 0 makes the log messages run at
# level 0 (i.e DBG_ERR), so we always see them in default,
# even though smb.conf doesn't ask.
mapping = [(x, 3, ['default', 'bees']['b' in x])
for x in CLASS_LIST]
expected_strings = self._get_expected_strings(mapping, 0)
self._write_smb_conf(mapping=[x for x in mapping if x[2] == 'bees'])
stdout, stderr = self._run_s4_logger(0)
self.assert_unlink_expected_strings(expected_strings)
def test_only_some_level_3(self):
# here, we're expecting the unmentioned non-b classes to just
# disappear.
level = 3
mapping = [(x, level, 'bees') for x in CLASS_LIST if 'b' in x]
expected_strings = self._get_expected_strings(mapping, level)
self._write_smb_conf(mapping=[x for x in mapping if x[2] == 'bees'])
stdout, stderr = self._run_s4_logger(level)
self.assert_unlink_expected_strings(expected_strings)
def test_none(self):
level = 4
mapping = []
expected_strings = self._get_expected_strings(mapping, level)
self._write_smb_conf(mapping=mapping)
stdout, stderr = self._run_s4_logger(level)
self.assert_unlink_expected_strings(expected_strings)
def test_none_high_default(self):
# We set the default level to 5 and do nothing else special,
# which means we need a different mapping for the smb.conf
# than the expected strings.
level = 4
mapping = [(x, 5, 'default') for x in CLASS_LIST]
expected_strings = self._get_expected_strings(mapping, level)
# note the empty mapping in smb.conf
self._write_smb_conf(mapping=[], default_level=5)
stdout, stderr = self._run_s4_logger(level)
self.assert_unlink_expected_strings(expected_strings)
def test_none_high_cmdline_d(self):
# We set the default level to 2, but run the 'server' with -d 10.
level = 4
mapping = [(x, 10, 'default') for x in CLASS_LIST]
expected_strings = self._get_expected_strings(mapping, level)
# note the empty mapping in smb.conf
self._write_smb_conf(mapping=[])
stdout, stderr = self._run_s4_logger(level, '-d', '10')
self.assert_unlink_expected_strings(expected_strings)
def test_interactive_high_default_simple(self):
# running with -i should send everything to stderr.
level = 4
mapping = [(x, 5, 'default') for x in CLASS_LIST]
expected_strings = self._get_expected_strings(mapping, level)
self._write_smb_conf(mapping=[], default_level=5)
stdout, stderr = self._run_s4_logger(level, '-i')
expected_lines = []
for v in expected_strings.values():
# stderr doesn't end up with leading ' '
expected_lines.extend([x.strip() for x in v])
self.assert_string_contains(stderr, expected_lines)
def test_interactive_complex_smb_conf(self):
# running with -i should send everything to stderr. The
# smb.conf will set the levels, but the target files are
# overridden.
# (this is the test_bifurcate_misc_log_level() smb.conf).
level = 4
dests = [None, 'odd']
mapping = []
for i, x in enumerate(CLASS_LIST):
parity = i & 1
log_level = level + 1 - (i % 3)
mapping.append((x, log_level, dests[parity]))
expected_strings = self._get_expected_strings(mapping, level)
self._write_smb_conf(mapping=mapping)
stdout, stderr = self._run_s4_logger(level, '-i')
expected_lines = []
for v in expected_strings.values():
# stderr doesn't end up with leading ' '
expected_lines.extend([x.strip() for x in v])
self.assert_string_contains(stderr, expected_lines)

View File

@ -92,6 +92,7 @@ planpythontestsuite("none", "samba.tests.s3idmapdb")
planpythontestsuite("none", "samba.tests.samba3sam") planpythontestsuite("none", "samba.tests.samba3sam")
planpythontestsuite("none", "samba.tests.dsdb_api") planpythontestsuite("none", "samba.tests.dsdb_api")
planpythontestsuite("none", "samba.tests.smbconf") planpythontestsuite("none", "samba.tests.smbconf")
planpythontestsuite("none", "samba.tests.logfiles")
planpythontestsuite( planpythontestsuite(
"none", "wafsamba.tests.test_suite", "none", "wafsamba.tests.test_suite",
extra_path=[os.path.join(samba4srcdir, "..", "buildtools"), extra_path=[os.path.join(samba4srcdir, "..", "buildtools"),