1
0
mirror of https://github.com/samba-team/samba.git synced 2025-03-27 22:50:26 +03:00

python/blackbox: add rpcd_witness_samba_only.py test

This tests the witness service and its interaction with
ctdb.

Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Günther Deschner <gd@samba.org>
This commit is contained in:
Stefan Metzmacher 2024-01-10 15:11:24 +01:00
parent b3c51c4b82
commit b17e090e7c
4 changed files with 474 additions and 0 deletions

View File

@ -0,0 +1,459 @@
#!/usr/bin/env python3
# Unix SMB/CIFS implementation.
#
# Copyright © 2024 Stefan Metzmacher <metze@samba.org>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import sys
import os
sys.path.insert(0, "bin/python")
os.environ["PYTHONUNBUFFERED"] = "1"
import json
import samba.tests
from samba.credentials import Credentials
from samba.ndr import ndr_print
from samba.dcerpc import witness
from samba.tests import DynamicTestCase, BlackboxTestCase
from samba.common import get_string
from samba import werror, WERRORError
@DynamicTestCase
class RpcdWitnessSambaTests(BlackboxTestCase):
@classmethod
def setUpDynamicTestCases(cls):
cls.num_nodes = int(samba.tests.env_get_var_value('NUM_NODES'))
def _define_tests(idx1, idx2, ndr64=False):
cls._define_GetInterfaceList_test(idx1, idx2, ndr64)
if idx1 == 0 and idx2 != -1:
cls._define_ResourceChangeCTDB_tests(idx1, idx2, ndr64)
for idx1 in range(0, cls.num_nodes):
_define_tests(idx1, -1, ndr64=False)
_define_tests(idx1, -1, ndr64=True)
for idx2 in range(0, cls.num_nodes):
_define_tests(idx1, idx2, ndr64=False)
_define_tests(idx1, idx2, ndr64=True)
def setUp(self):
super().setUp()
# ctdb/tests/local_daemons.sh doesn't like CTDB_SOCKET to be set already
# and it doesn't need CTDB_BASE, so we stash them away
self.saved_CTDB_SOCKET = samba.tests.env_get_var_value('CTDB_SOCKET',
allow_missing=True)
if self.saved_CTDB_SOCKET is not None:
del os.environ["CTDB_SOCKET"]
self.saved_CTDB_BASE = samba.tests.env_get_var_value('CTDB_BASE',
allow_missing=True)
if self.saved_CTDB_BASE is not None:
del os.environ["CTDB_BASE"]
self.disabled_idx = -1
# set this to True in order to get verbose output
self.verbose = False
self.ctdb_prefix = samba.tests.env_get_var_value('CTDB_PREFIX')
self.cluster_share = samba.tests.env_get_var_value('CLUSTER_SHARE')
self.lp = self.get_loadparm(s3=True)
self.remote_domain = samba.tests.env_get_var_value('DOMAIN')
self.remote_user = samba.tests.env_get_var_value('USERNAME')
self.remote_password = samba.tests.env_get_var_value('PASSWORD')
self.remote_creds = Credentials()
self.remote_creds.guess(self.lp)
self.remote_creds.set_username(self.remote_user)
self.remote_creds.set_domain(self.remote_domain)
self.remote_creds.set_password(self.remote_password)
self.server_hostname = samba.tests.env_get_var_value('SERVER_HOSTNAME')
self.interface_group_name = samba.tests.env_get_var_value('INTERFACE_GROUP_NAME')
common_binding_args = "spnego,sign,target_hostname=%s" % (
self.server_hostname)
if self.verbose:
common_binding_args += ",print"
common_binding_args32 = common_binding_args
common_binding_args64 = common_binding_args + ",ndr64"
self.nodes = []
for node_idx in range(0, self.num_nodes):
node = {}
name_var = 'CTDB_SERVER_NAME_NODE%u' % node_idx
node["name"] = samba.tests.env_get_var_value(name_var)
ip_var = 'CTDB_IFACE_IP_NODE%u' % node_idx
node["ip"] = samba.tests.env_get_var_value(ip_var)
node["binding_string32"] = "ncacn_ip_tcp:%s[%s]" % (
node["ip"], common_binding_args32)
node["binding_string64"] = "ncacn_ip_tcp:%s[%s]" % (
node["ip"], common_binding_args64)
self.nodes.append(node)
def tearDown(self):
if self.disabled_idx != -1:
self.enable_node(self.disabled_idx)
if self.saved_CTDB_SOCKET is not None:
os.environ["CTDB_SOCKET"] = self.saved_CTDB_SOCKET
self.saved_CTDB_SOCKET = None
if self.saved_CTDB_BASE is not None:
os.environ["CTDB_BASE"] = self.saved_CTDB_BASE
self.saved_CTDB_BASE = None
super().tearDown()
def call_onnode(self, nodes, cmd):
COMMAND = "ctdb/tests/local_daemons.sh"
argv = "%s '%s' onnode %s '%s'" % (COMMAND, self.ctdb_prefix, nodes, cmd)
try:
if self.verbose:
print("Calling: %s" % argv)
out = self.check_output(argv)
except samba.tests.BlackboxProcessError as e:
self.fail("Error calling [%s]: %s" % (argv, e))
out_str = get_string(out)
return out_str
def dump_ctdb_status_all(self):
for node_idx in range(0, self.num_nodes):
print("%s" % self.call_onnode(str(node_idx), "ctdb status"))
def disable_node(self, node_idx, dump_status=False):
if dump_status:
self.dump_ctdb_status_all()
self.assertEqual(self.disabled_idx, -1)
self.call_onnode(str(node_idx), "ctdb disable")
self.disabled_idx = node_idx
if dump_status:
self.dump_ctdb_status_all()
def enable_node(self, node_idx, dump_status=False):
if dump_status:
self.dump_ctdb_status_all()
self.assertEqual(self.disabled_idx, node_idx)
self.call_onnode(str(node_idx), "ctdb enable")
self.disabled_idx = -1
if dump_status:
self.dump_ctdb_status_all()
@classmethod
def _define_GetInterfaceList_test(cls, conn_idx, disable_idx, ndr64=False):
if disable_idx != -1:
disable_name = "%u_disabled" % disable_idx
else:
disable_name = "all_enabled"
if ndr64:
ndr_name = "NDR64"
else:
ndr_name = "NDR32"
name = "Node%u_%s_%s" % (conn_idx, disable_name, ndr_name)
args = {
'conn_idx': conn_idx,
'disable_idx': disable_idx,
'ndr64': ndr64,
}
cls.generate_dynamic_test('test_GetInterfaceList', name, args)
def _test_GetInterfaceList_with_args(self, args):
conn_idx = args.pop('conn_idx')
disable_idx = args.pop('disable_idx')
ndr64 = args.pop('ndr64')
self.assertEqual(len(args.keys()), 0)
conn_node = self.nodes[conn_idx]
if ndr64:
binding_string = conn_node["binding_string64"]
else:
binding_string = conn_node["binding_string32"]
if disable_idx != -1:
self.disable_node(disable_idx)
conn = witness.witness(binding_string, self.lp, self.remote_creds)
interface_list = conn.GetInterfaceList()
if disable_idx != -1:
self.enable_node(disable_idx)
self.assertIsNotNone(interface_list)
self.assertEqual(interface_list.num_interfaces, len(self.nodes))
for idx in range(0, interface_list.num_interfaces):
iface = interface_list.interfaces[idx]
node = self.nodes[idx]
expected_flags = 0
expected_flags |= witness.WITNESS_INFO_IPv4_VALID
if conn_idx != idx:
expected_flags |= witness.WITNESS_INFO_WITNESS_IF
if disable_idx == idx:
expected_state = witness.WITNESS_STATE_UNAVAILABLE
else:
expected_state = witness.WITNESS_STATE_AVAILABLE
self.assertIsNotNone(iface.group_name)
self.assertEqual(iface.group_name, self.interface_group_name)
self.assertEqual(iface.version, witness.WITNESS_V2)
self.assertEqual(iface.state, expected_state)
self.assertIsNotNone(iface.ipv4)
self.assertEqual(iface.ipv4, node["ip"])
self.assertIsNotNone(iface.ipv6)
self.assertEqual(iface.ipv6,
"0000:0000:0000:0000:0000:0000:0000:0000")
self.assertEqual(iface.flags, expected_flags)
def assertResourceChanges(self, response, expected_resource_changes):
self.assertIsNotNone(response)
self.assertEqual(response.type,
witness.WITNESS_NOTIFY_RESOURCE_CHANGE)
self.assertEqual(response.num, len(expected_resource_changes))
self.assertEqual(len(response.messages), len(expected_resource_changes))
for ri in range(0, len(expected_resource_changes)):
expected_resource_change = expected_resource_changes[ri]
resource_change = response.messages[ri]
self.assertIsNotNone(resource_change)
expected_type = witness.WITNESS_RESOURCE_STATE_UNAVAILABLE
expected_type = expected_resource_change.get('type', expected_type)
expected_name = expected_resource_change.get('name')
self.assertEqual(resource_change.type, expected_type)
self.assertIsNotNone(resource_change.name)
self.assertEqual(resource_change.name, expected_name)
def assertResourceChange(self, response, expected_type, expected_name):
expected_resource_change = {
'type': expected_type,
'name': expected_name,
}
expected_resource_changes = [expected_resource_change]
self.assertResourceChanges(response, expected_resource_changes)
@classmethod
def _define_ResourceChangeCTDB_tests(cls, conn_idx, monitor_idx, ndr64=False):
if ndr64:
ndr_name = "NDR64"
else:
ndr_name = "NDR32"
name_suffix = "WNode%u_RNode%u_%s" % (conn_idx, monitor_idx, ndr_name)
base_args = {
'conn_idx': conn_idx,
'monitor_idx': monitor_idx,
'ndr64': ndr64,
}
name = "v1_disabled_after_%s" % name_suffix
args = base_args.copy()
args['reg_v1'] = True
args['disable_after_reg'] = True
args['explicit_unregister'] = False
cls.generate_dynamic_test('test_ResourceChangeCTDB', name, args)
name = "v1_disabled_after_enabled_after_%s" % name_suffix
args = base_args.copy()
args['reg_v1'] = True
args['disable_after_reg'] = True
args['enable_after_reg'] = True
args['explicit_unregister'] = False
cls.generate_dynamic_test('test_ResourceChangeCTDB', name, args)
name = "v2_disabled_before_enable_after_%s" % name_suffix
args = base_args.copy()
args['disable_before_reg'] = True
args['enable_after_reg'] = True
args['wait_for_timeout'] = True
args['timeout'] = 6
cls.generate_dynamic_test('test_ResourceChangeCTDB', name, args)
name = "v2_disabled_after_%s" % name_suffix
args = base_args.copy()
args['disable_after_reg'] = True
args['wait_for_not_found'] = True
args['explicit_unregister'] = False
cls.generate_dynamic_test('test_ResourceChangeCTDB', name, args)
name = "v2_disabled_after_enabled_after_%s" % name_suffix
args = base_args.copy()
args['disable_after_reg'] = True
args['enable_after_reg'] = True
args['wait_for_not_found'] = True
args['explicit_unregister'] = False
cls.generate_dynamic_test('test_ResourceChangeCTDB', name, args)
name = "share_v2_disabled_before_enable_after_%s" % name_suffix
args = base_args.copy()
args['share_reg'] = True
args['disable_before_reg'] = True
args['enable_after_reg'] = True
cls.generate_dynamic_test('test_ResourceChangeCTDB', name, args)
name = "share_v2_disabled_after_%s" % name_suffix
args = base_args.copy()
args['share_reg'] = True
args['disable_after_reg'] = True
args['explicit_unregister'] = False
cls.generate_dynamic_test('test_ResourceChangeCTDB', name, args)
name = "share_v2_disabled_after_enabled_after_%s" % name_suffix
args = base_args.copy()
args['share_reg'] = True
args['disable_after_reg'] = True
args['enable_after_reg'] = True
args['explicit_unregister'] = False
cls.generate_dynamic_test('test_ResourceChangeCTDB', name, args)
def _test_ResourceChangeCTDB_with_args(self, args):
conn_idx = args.pop('conn_idx')
monitor_idx = args.pop('monitor_idx')
ndr64 = args.pop('ndr64')
timeout = int(args.pop('timeout', 15))
reg_v1 = args.pop('reg_v1', False)
share_reg = args.pop('share_reg', False)
disable_before_reg = args.pop('disable_before_reg', False)
disable_after_reg = args.pop('disable_after_reg', False)
enable_after_reg = args.pop('enable_after_reg', False)
explicit_unregister = args.pop('explicit_unregister', True)
wait_for_not_found = args.pop('wait_for_not_found', False)
wait_for_timeout = args.pop('wait_for_timeout', False)
self.assertEqual(len(args.keys()), 0)
conn_node = self.nodes[conn_idx]
if ndr64:
binding_string = conn_node["binding_string64"]
else:
binding_string = conn_node["binding_string32"]
monitor_node = self.nodes[monitor_idx]
computer_name = "test-rpcd-witness-samba-only-client-computer"
conn = witness.witness(binding_string, self.lp, self.remote_creds)
if disable_before_reg:
self.assertFalse(disable_after_reg)
self.disable_node(monitor_idx)
if reg_v1:
self.assertFalse(wait_for_timeout)
self.assertFalse(share_reg)
reg_context = conn.Register(witness.WITNESS_V1,
self.server_hostname,
monitor_node["ip"],
computer_name)
else:
if share_reg:
share_name = self.cluster_share
else:
share_name = None
reg_context = conn.RegisterEx(witness.WITNESS_V2,
self.server_hostname,
share_name,
monitor_node["ip"],
computer_name,
witness.WITNESS_REGISTER_NONE,
timeout)
if disable_after_reg:
self.assertFalse(disable_before_reg)
self.disable_node(monitor_idx)
if enable_after_reg:
self.enable_node(monitor_idx)
if disable_after_reg:
response_unavailable = conn.AsyncNotify(reg_context)
self.assertResourceChange(response_unavailable,
witness.WITNESS_RESOURCE_STATE_UNAVAILABLE,
monitor_node["ip"])
if enable_after_reg:
response_available = conn.AsyncNotify(reg_context)
self.assertResourceChange(response_available,
witness.WITNESS_RESOURCE_STATE_AVAILABLE,
monitor_node["ip"])
if wait_for_timeout:
self.assertFalse(wait_for_not_found)
self.assertFalse(disable_after_reg)
try:
_ = conn.AsyncNotify(reg_context)
self.fail()
except WERRORError as e:
(num, string) = e.args
if num != werror.WERR_TIMEOUT:
raise
if wait_for_not_found:
self.assertFalse(wait_for_timeout)
self.assertTrue(disable_after_reg)
self.assertFalse(explicit_unregister)
try:
_ = conn.AsyncNotify(reg_context)
self.fail()
except WERRORError as e:
(num, string) = e.args
if num != werror.WERR_NOT_FOUND:
raise
if not explicit_unregister:
return
conn.UnRegister(reg_context)
try:
_ = conn.AsyncNotify(reg_context)
self.fail()
except WERRORError as e:
(num, string) = e.args
if num != werror.WERR_NOT_FOUND:
raise
try:
conn.UnRegister(reg_context)
self.fail()
except WERRORError as e:
(num, string) = e.args
if num != werror.WERR_NOT_FOUND:
raise
if __name__ == "__main__":
import unittest
unittest.main()

View File

@ -98,11 +98,13 @@ EXCLUDE_DIRS = {
'source4/dsdb/tests/python',
'bin/ab',
'bin/python/samba/tests',
'bin/python/samba/tests/blackbox',
'bin/python/samba/tests/dcerpc',
'bin/python/samba/tests/krb5',
'bin/python/samba/tests/ndr',
'python/samba/tests',
'python/samba/tests/bin',
'python/samba/tests/blackbox',
'python/samba/tests/dcerpc',
'python/samba/tests/krb5',
'python/samba/tests/ndr',

View File

@ -541,6 +541,7 @@ sub setup_clusteredmember
server signing = on
clustering = yes
rpc start on demand helpers = false
rpcd witness:include node ips = yes
ctdbd socket = ${socket}
include = registry
dbwrap_tdb_mutexes:* = yes

View File

@ -2168,3 +2168,15 @@ planoldpythontestsuite("none", "samba.tests.usage")
planpythontestsuite("fileserver", "samba.tests.dcerpc.mdssvc")
planoldpythontestsuite("none", "samba.tests.compression")
planpythontestsuite("none", "samba.tests.security_descriptors")
if have_cluster_support:
cluster_environ = {
"SERVER_HOSTNAME": "$NETBIOSNAME",
"INTERFACE_GROUP_NAME": "$NETBIOSNAME",
"CLUSTER_SHARE": "registry_share",
"USERNAME": "$DC_USERNAME",
"PASSWORD": "$DC_PASSWORD",
}
planpythontestsuite("clusteredmember:local",
"samba.tests.blackbox.rpcd_witness_samba_only",
environ=cluster_environ)