From 98d908bfd07283878a7a6a630c2bfe5d27b5ffd8 Mon Sep 17 00:00:00 2001 From: Stefan Metzmacher Date: Mon, 23 Sep 2024 15:13:59 +0200 Subject: [PATCH] tests/dcerpc/raw_protocol: pass against Windows 2022 and require special env vars for legacy servers Test works against Windows 2022 and works like this: SMB_CONF_PATH=/dev/null SERVER=172.31.9.118 \ TARGET_HOSTNAME=w2022-118.w2022-l7.base IGNORE_RANDOM_PAD=1 \ DOMAIN=W2022-L7 REALM=W2022-L7.BASE \ USERNAME=administrator PASSWORD=A1b2C3d4 \ python/samba/tests/dcerpc/raw_protocol.py -v -f TestDCERPC_BIND Against a legacy Windows2012R2 server this still works: SMB_CONF_PATH=/dev/null SERVER=172.31.9.188 \ TARGET_HOSTNAME=w2012r2-188.w2012r2-l6.base ALLOW_BIND_AUTH_PAD=1 \ LEGACY_BIND_NACK_NO_REASON=1 AUTH_LEVEL_CONNECT_LSA=1 \ IGNORE_RANDOM_PAD=1 DOMAIN=W2012R2-L6 REALM=W2012R2-L6.BASE \ USERNAME=administrator PASSWORD=A1b2C3d4 \ python/samba/tests/dcerpc/raw_protocol.py -v -f TestDCERPC_BIND Currently Samba behaves like 2012R2, but the next commits will change that... BUG: https://bugzilla.samba.org/show_bug.cgi?id=14356 Signed-off-by: Stefan Metzmacher Reviewed-by: Andreas Schneider --- python/samba/tests/dcerpc/raw_protocol.py | 206 ++++++++++++++++++++-- python/samba/tests/dcerpc/raw_testcase.py | 6 + source4/selftest/tests.py | 3 + 3 files changed, 198 insertions(+), 17 deletions(-) diff --git a/python/samba/tests/dcerpc/raw_protocol.py b/python/samba/tests/dcerpc/raw_protocol.py index fa5a042b851..d5cd10d724a 100755 --- a/python/samba/tests/dcerpc/raw_protocol.py +++ b/python/samba/tests/dcerpc/raw_protocol.py @@ -358,13 +358,23 @@ class TestDCERPC_BIND(RawDCERPCTest): rep = self.recv_pdu() self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_NAK, req.call_id, auth_length=0) - self.assertEqual(rep.u.reject_reason, - dcerpc.DCERPC_BIND_NAK_REASON_NOT_SPECIFIED) + if self.legacy_bind_nak_no_reason: + # legacy e.g. w2012r2 + expected_reject_reason = dcerpc.DCERPC_BIND_NAK_REASON_NOT_SPECIFIED + else: + # modern (e.g. w2022) + expected_reject_reason = dcerpc.DCERPC_BIND_NAK_REASON_PROTOCOL_VERSION_NOT_SUPPORTED + self.assertEqual(rep.u.reject_reason, expected_reject_reason) self.assertEqual(rep.u.num_versions, 1) self.assertEqual(rep.u.versions[0].rpc_vers, req.rpc_vers) self.assertEqual(rep.u.versions[0].rpc_vers_minor, req.rpc_vers_minor) self.assertPadding(rep.u._pad, 3) + # wait for a disconnect + rep = self.recv_pdu() + self.assertIsNone(rep) + self.assertNotConnected() + def test_invalid_auth_noctx(self): req = self.generate_bind(call_id=0) req.auth_length = dcerpc.DCERPC_AUTH_TRAILER_LENGTH @@ -378,6 +388,10 @@ class TestDCERPC_BIND(RawDCERPCTest): self.assertEqual(rep.u.versions[0].rpc_vers, req.rpc_vers) self.assertEqual(rep.u.versions[0].rpc_vers_minor, req.rpc_vers_minor) self.assertPadding(rep.u._pad, 3) + # wait for a disconnect + rep = self.recv_pdu() + self.assertIsNone(rep) + self.assertNotConnected() def test_no_auth_valid_valid_request(self): ndr32 = base.transfer_syntax_ndr() @@ -440,8 +454,13 @@ class TestDCERPC_BIND(RawDCERPCTest): rep = self.recv_pdu() self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_NAK, req.call_id, auth_length=0) - self.assertEqual(rep.u.reject_reason, - dcerpc.DCERPC_BIND_NAK_REASON_NOT_SPECIFIED) + if self.legacy_bind_nak_no_reason: + # legacy e.g. w2012r2 + expected_reject_reason = dcerpc.DCERPC_BIND_NAK_REASON_NOT_SPECIFIED + else: + # modern (e.g. w2022) + expected_reject_reason = dcerpc.DCERPC_BIND_NAK_REASON_PROTOCOL_VERSION_NOT_SUPPORTED + self.assertEqual(rep.u.reject_reason, expected_reject_reason) self.assertEqual(rep.u.num_versions, 1) self.assertEqual(rep.u.versions[0].rpc_vers, req.rpc_vers) self.assertEqual(rep.u.versions[0].rpc_vers_minor, req.rpc_vers_minor) @@ -5200,6 +5219,21 @@ class TestDCERPC_BIND(RawDCERPCTest): auth_info=auth_info) self.send_pdu(req) rep = self.recv_pdu() + if not self.allow_bind_auth_pad: + # modern server (e.g. 2022) + self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_NAK, req.call_id, + auth_length=0) + self.assertEqual(rep.u.reject_reason, + dcerpc.DCERPC_BIND_NAK_REASON_PROTOCOL_VERSION_NOT_SUPPORTED) + self.assertEqual(rep.u.num_versions, 1) + self.assertEqual(rep.u.versions[0].rpc_vers, req.rpc_vers) + self.assertEqual(rep.u.versions[0].rpc_vers_minor, req.rpc_vers_minor) + self.assertPadding(rep.u._pad, 3) + # wait for a disconnect + rep = self.recv_pdu() + self.assertIsNone(rep) + self.assertNotConnected() + return self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_ACK, req.call_id) self.assertEqual(rep.u.max_xmit_frag, req.u.max_xmit_frag) self.assertEqual(rep.u.max_recv_frag, req.u.max_recv_frag) @@ -5414,6 +5448,21 @@ class TestDCERPC_BIND(RawDCERPCTest): auth_info=auth_info) self.send_pdu(req) rep = self.recv_pdu() + if not self.allow_bind_auth_pad: + # modern server (e.g. 2022) + self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_NAK, req.call_id, + auth_length=0) + self.assertEqual(rep.u.reject_reason, + dcerpc.DCERPC_BIND_NAK_REASON_PROTOCOL_VERSION_NOT_SUPPORTED) + self.assertEqual(rep.u.num_versions, 1) + self.assertEqual(rep.u.versions[0].rpc_vers, req.rpc_vers) + self.assertEqual(rep.u.versions[0].rpc_vers_minor, req.rpc_vers_minor) + self.assertPadding(rep.u._pad, 3) + # wait for a disconnect + rep = self.recv_pdu() + self.assertIsNone(rep) + self.assertNotConnected() + return self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_ACK, req.call_id) self.assertEqual(rep.u.max_xmit_frag, req.u.max_xmit_frag) self.assertEqual(rep.u.max_recv_frag, req.u.max_recv_frag) @@ -5526,6 +5575,21 @@ class TestDCERPC_BIND(RawDCERPCTest): auth_info=auth_info) self.send_pdu(req) rep = self.recv_pdu() + if not self.allow_bind_auth_pad: + # modern server (e.g. 2022) + self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_NAK, req.call_id, + auth_length=0) + self.assertEqual(rep.u.reject_reason, + dcerpc.DCERPC_BIND_NAK_REASON_PROTOCOL_VERSION_NOT_SUPPORTED) + self.assertEqual(rep.u.num_versions, 1) + self.assertEqual(rep.u.versions[0].rpc_vers, req.rpc_vers) + self.assertEqual(rep.u.versions[0].rpc_vers_minor, req.rpc_vers_minor) + self.assertPadding(rep.u._pad, 3) + # wait for a disconnect + rep = self.recv_pdu() + self.assertIsNone(rep) + self.assertNotConnected() + return self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_ACK, req.call_id) self.assertEqual(rep.u.max_xmit_frag, req.u.max_xmit_frag) self.assertEqual(rep.u.max_recv_frag, req.u.max_recv_frag) @@ -5645,6 +5709,21 @@ class TestDCERPC_BIND(RawDCERPCTest): auth_info=auth_info) self.send_pdu(req) rep = self.recv_pdu() + if not self.allow_bind_auth_pad: + # modern server (e.g. 2022) + self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_NAK, req.call_id, + auth_length=0) + self.assertEqual(rep.u.reject_reason, + dcerpc.DCERPC_BIND_NAK_REASON_PROTOCOL_VERSION_NOT_SUPPORTED) + self.assertEqual(rep.u.num_versions, 1) + self.assertEqual(rep.u.versions[0].rpc_vers, req.rpc_vers) + self.assertEqual(rep.u.versions[0].rpc_vers_minor, req.rpc_vers_minor) + self.assertPadding(rep.u._pad, 3) + # wait for a disconnect + rep = self.recv_pdu() + self.assertIsNone(rep) + self.assertNotConnected() + return self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_ACK, req.call_id) self.assertEqual(rep.u.max_xmit_frag, req.u.max_xmit_frag) self.assertEqual(rep.u.max_recv_frag, req.u.max_recv_frag) @@ -6630,6 +6709,15 @@ class TestDCERPC_BIND(RawDCERPCTest): ctx=ctx1, auth_context=auth_context1) + # + # Note modern servers don't allow + # DCERPC_AUTH_LEVEL_CONNECT for lsa anymore + # + if self.auth_level_connect_lsa: + expected_fault_status = None + else: + expected_fault_status = dcerpc.DCERPC_FAULT_ACCESS_DENIED + # # With just one explicit auth context and that # uses AUTH_LEVEL_CONNECT context. @@ -6637,7 +6725,14 @@ class TestDCERPC_BIND(RawDCERPCTest): # We always get that by default instead of the one default one # inherited from the transport # - self.do_single_request(call_id=1, ctx=ctx1, io=get_user_name) + self.do_single_request(call_id=1, ctx=ctx1, io=get_user_name, + fault_status=expected_fault_status) + if not self.auth_level_connect_lsa: + # wait for a disconnect + rep = self.recv_pdu() + self.assertIsNone(rep) + self.assertNotConnected() + return self.assertEqual(get_user_name.result[0], NT_STATUS_SUCCESS) self.assertEqualsStrLower(get_user_name.out_account_name, account_name1) self.assertEqualsStrLower(get_user_name.out_authority_name.value, authority_name1) @@ -6764,6 +6859,15 @@ class TestDCERPC_BIND(RawDCERPCTest): assoc_group_id = ack0.u.assoc_group_id, start_with_alter=True) + # + # Note modern servers don't allow + # DCERPC_AUTH_LEVEL_CONNECT for lsa anymore + # + if self.auth_level_connect_lsa: + expected_fault_status = None + else: + expected_fault_status = dcerpc.DCERPC_FAULT_ACCESS_DENIED + # # With just one explicit auth context and that # uses AUTH_LEVEL_CONNECT context. @@ -6771,7 +6875,14 @@ class TestDCERPC_BIND(RawDCERPCTest): # We always get that by default instead of the one default one # inherited from the transport # - self.do_single_request(call_id=3, ctx=ctx1, io=get_user_name) + self.do_single_request(call_id=3, ctx=ctx1, io=get_user_name, + fault_status=expected_fault_status) + if not self.auth_level_connect_lsa: + # wait for a disconnect + rep = self.recv_pdu() + self.assertIsNone(rep) + self.assertNotConnected() + return self.assertEqual(get_user_name.result[0], NT_STATUS_SUCCESS) self.assertEqualsStrLower(get_user_name.out_account_name, account_name1) self.assertEqualsStrLower(get_user_name.out_authority_name.value, authority_name1) @@ -6898,6 +7009,15 @@ class TestDCERPC_BIND(RawDCERPCTest): assoc_group_id = ack0.u.assoc_group_id, start_with_alter=True) + # + # Note modern servers don't allow + # DCERPC_AUTH_LEVEL_CONNECT for lsa anymore + # + if self.auth_level_connect_lsa: + expected_fault_status = None + else: + expected_fault_status = dcerpc.DCERPC_FAULT_ACCESS_DENIED + # # With just one explicit auth context and that # uses AUTH_LEVEL_CONNECT context. @@ -6907,7 +7027,14 @@ class TestDCERPC_BIND(RawDCERPCTest): # # Until an explicit usage resets that mode # - self.do_single_request(call_id=3, ctx=ctx1, io=get_user_name) + self.do_single_request(call_id=3, ctx=ctx1, io=get_user_name, + fault_status=expected_fault_status) + if not self.auth_level_connect_lsa: + # wait for a disconnect + rep = self.recv_pdu() + self.assertIsNone(rep) + self.assertNotConnected() + return self.assertEqual(get_user_name.result[0], NT_STATUS_SUCCESS) self.assertEqualsStrLower(get_user_name.out_account_name, account_name1) self.assertEqualsStrLower(get_user_name.out_authority_name.value, authority_name1) @@ -7064,6 +7191,15 @@ class TestDCERPC_BIND(RawDCERPCTest): assoc_group_id = ack0.u.assoc_group_id, start_with_alter=True) + # + # Note modern servers don't allow + # DCERPC_AUTH_LEVEL_CONNECT for lsa anymore + # + if self.auth_level_connect_lsa: + expected_fault_status = None + else: + expected_fault_status = dcerpc.DCERPC_FAULT_ACCESS_DENIED + # # With just one explicit auth context and that # uses AUTH_LEVEL_CONNECT context. @@ -7073,7 +7209,14 @@ class TestDCERPC_BIND(RawDCERPCTest): # # Until a new explicit context resets the mode # - self.do_single_request(call_id=3, ctx=ctx1, io=get_user_name) + self.do_single_request(call_id=3, ctx=ctx1, io=get_user_name, + fault_status=expected_fault_status) + if not self.auth_level_connect_lsa: + # wait for a disconnect + rep = self.recv_pdu() + self.assertIsNone(rep) + self.assertNotConnected() + return self.assertEqual(get_user_name.result[0], NT_STATUS_SUCCESS) self.assertEqualsStrLower(get_user_name.out_account_name, account_name1) self.assertEqualsStrLower(get_user_name.out_authority_name.value, authority_name1) @@ -7089,10 +7232,26 @@ class TestDCERPC_BIND(RawDCERPCTest): assoc_group_id = ack0.u.assoc_group_id, start_with_alter=True) + # + # Note modern servers don't allow + # DCERPC_AUTH_LEVEL_CONNECT for lsa anymore + # + if self.auth_level_connect_lsa: + expected_fault_status = None + else: + expected_fault_status = dcerpc.DCERPC_FAULT_ACCESS_DENIED + # # A new auth context with LEVEL_CONNECT resets the default. # - self.do_single_request(call_id=6, ctx=ctx1, io=get_user_name) + self.do_single_request(call_id=6, ctx=ctx1, io=get_user_name, + fault_status=expected_fault_status) + if not self.auth_level_connect_lsa: + # wait for a disconnect + rep = self.recv_pdu() + self.assertIsNone(rep) + self.assertNotConnected() + return self.assertEqual(get_user_name.result[0], NT_STATUS_SUCCESS) self.assertEqualsStrLower(get_user_name.out_account_name, account_name2) self.assertEqualsStrLower(get_user_name.out_authority_name.value, authority_name2) @@ -7265,6 +7424,15 @@ class TestDCERPC_BIND(RawDCERPCTest): ctx=ctx1, auth_context=auth_context1) + # + # Note modern servers don't allow + # DCERPC_AUTH_LEVEL_CONNECT for lsa anymore + # + if self.auth_level_connect_lsa: + expected_fault_status = None + else: + expected_fault_status = dcerpc.DCERPC_FAULT_ACCESS_DENIED + # # With just one explicit auth context and that # *not* uses AUTH_LEVEL_CONNECT context. @@ -7336,15 +7504,19 @@ class TestDCERPC_BIND(RawDCERPCTest): # # Until an explicit usage of any auth context reset that mode. # - self.do_single_request(call_id=10, ctx=ctx1, io=get_user_name) - self.assertEqual(get_user_name.result[0], NT_STATUS_SUCCESS) - self.assertEqualsStrLower(get_user_name.out_account_name, account_name3) - self.assertEqualsStrLower(get_user_name.out_authority_name.value, authority_name3) + self.do_single_request(call_id=10, ctx=ctx1, io=get_user_name, + fault_status=expected_fault_status) + if self.auth_level_connect_lsa: + self.assertEqual(get_user_name.result[0], NT_STATUS_SUCCESS) + self.assertEqualsStrLower(get_user_name.out_account_name, account_name3) + self.assertEqualsStrLower(get_user_name.out_authority_name.value, authority_name3) - self.do_single_request(call_id=11, ctx=ctx1, io=get_user_name) - self.assertEqual(get_user_name.result[0], NT_STATUS_SUCCESS) - self.assertEqualsStrLower(get_user_name.out_account_name, account_name3) - self.assertEqualsStrLower(get_user_name.out_authority_name.value, authority_name3) + self.do_single_request(call_id=11, ctx=ctx1, io=get_user_name, + fault_status=expected_fault_status) + if self.auth_level_connect_lsa: + self.assertEqual(get_user_name.result[0], NT_STATUS_SUCCESS) + self.assertEqualsStrLower(get_user_name.out_account_name, account_name3) + self.assertEqualsStrLower(get_user_name.out_authority_name.value, authority_name3) self.do_single_request(call_id=12, ctx=ctx1, io=get_user_name, auth_context=auth_context1) diff --git a/python/samba/tests/dcerpc/raw_testcase.py b/python/samba/tests/dcerpc/raw_testcase.py index f3a4cdff459..c2871909abc 100644 --- a/python/samba/tests/dcerpc/raw_testcase.py +++ b/python/samba/tests/dcerpc/raw_testcase.py @@ -162,6 +162,12 @@ class RawDCERPCTest(TestCase): self.ignore_random_pad = samba.tests.env_get_var_value('IGNORE_RANDOM_PAD', allow_missing=True) + self.auth_level_connect_lsa = samba.tests.env_get_var_value('AUTH_LEVEL_CONNECT_LSA', + allow_missing=True) + self.allow_bind_auth_pad = samba.tests.env_get_var_value('ALLOW_BIND_AUTH_PAD', + allow_missing=True) + self.legacy_bind_nak_no_reason = samba.tests.env_get_var_value('LEGACY_BIND_NACK_NO_REASON', + allow_missing=True) self.host = samba.tests.env_get_var_value('SERVER') self.target_hostname = samba.tests.env_get_var_value('TARGET_HOSTNAME', allow_missing=True) if self.target_hostname is None: diff --git a/source4/selftest/tests.py b/source4/selftest/tests.py index 0f49c165a4d..c4f98019646 100755 --- a/source4/selftest/tests.py +++ b/source4/selftest/tests.py @@ -1371,6 +1371,9 @@ planoldpythontestsuite("ad_dc", "samba.tests.dcerpc.dnsserver", extra_args=['-U" for env in ["chgdcpass", "ad_member"]: planoldpythontestsuite(env, "samba.tests.dcerpc.raw_protocol", environ={"MAX_NUM_AUTH": "8", + "ALLOW_BIND_AUTH_PAD": "1", + "AUTH_LEVEL_CONNECT_LSA": "1", + "LEGACY_BIND_NACK_NO_REASON": "1", "USERNAME": "$DC_USERNAME", "PASSWORD": "$DC_PASSWORD"})