diff --git a/python/samba/tests/dcerpc/raw_protocol.py b/python/samba/tests/dcerpc/raw_protocol.py index c25c17c351a..95b3533cfad 100755 --- a/python/samba/tests/dcerpc/raw_protocol.py +++ b/python/samba/tests/dcerpc/raw_protocol.py @@ -33,6 +33,7 @@ import samba.dcerpc.lsa import struct from samba import gensec from samba.tests.dcerpc.raw_testcase import RawDCERPCTest +from samba.tests import DynamicTestCase from samba.ntstatus import ( NT_STATUS_SUCCESS ) @@ -41,6 +42,7 @@ global_ndr_print = False global_hexdump = False +@DynamicTestCase class TestDCERPC_BIND(RawDCERPCTest): def setUp(self): @@ -48,6 +50,11 @@ class TestDCERPC_BIND(RawDCERPCTest): self.do_ndr_print = global_ndr_print self.do_hexdump = global_hexdump + @classmethod + def setUpDynamicTestCases(cls): + cls._setup_auth_pad_ignored() + return + def _test_no_auth_request_bind_pfc_flags(self, req_pfc_flags, rep_pfc_flags): ndr32 = base.transfer_syntax_ndr() @@ -5188,6 +5195,536 @@ class TestDCERPC_BIND(RawDCERPCTest): self.assertIsNone(rep) self.assertNotConnected() + def _prepare_pdu_auth_padding(self, req, before=b"", behind=b"", max_xmit_frag=None): + if max_xmit_frag is None: + max_xmit_frag = self.max_xmit_frag + max_pad_length = max_xmit_frag - req.frag_length + total_pad_length = len(before) + len(behind) + self.assertLessEqual(total_pad_length, max_pad_length) + + req_pdu_tmp = self.prepare_pdu(req) + + auth_length = dcerpc.DCERPC_AUTH_TRAILER_LENGTH + req.auth_length + auth_offset = req.frag_length - auth_length + auth_length = dcerpc.DCERPC_AUTH_TRAILER_LENGTH + req.auth_length + len(behind) + new_len_fields = struct.pack(' req.auth_length: + max_behind_len = max_auth_length - req.auth_length + else: + max_behind_len = 0 + if behind_len > max_behind_len: + behind_len = max_behind_len + req_pdu = self._prepare_pdu_auth_padding(req, + before=b'\xfb' * before_len, + behind=b'\xfa' * behind_len, + max_xmit_frag=max_xmit_frag) + return req_pdu + + def _get_pdu_auth_len(self, req_pdu): + al = req_pdu[dcerpc.DCERPC_AUTH_LEN_OFFSET:dcerpc.DCERPC_AUTH_LEN_OFFSET+2] + l = struct.unpack(' max_auth_length: + bind_nak_reason = dcerpc.DCERPC_BIND_NAK_REASON_INVALID_CHECKSUM + self.send_pdu_blob(req_pdu) + rep = self.recv_pdu() + if bind_nak_reason is not None: + self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_NAK, req.call_id, + auth_length=0) + self.assertEqual(rep.u.reject_reason, bind_nak_reason) + self.assertEqual(rep.u.num_versions, 1) + self.assertEqual(rep.u.versions[0].rpc_vers, req.rpc_vers) + self.assertEqual(rep.u.versions[0].rpc_vers_minor, req.rpc_vers_minor) + self.assertPadding(rep.u._pad, 3) + + # wait for a disconnect + rep = self.recv_pdu() + self.assertIsNone(rep) + self.assertNotConnected() + return + self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_ACK, req.call_id) + self.assertEqual(rep.u.max_xmit_frag, req.u.max_xmit_frag) + self.assertEqual(rep.u.max_recv_frag, req.u.max_recv_frag) + self.assertNotEqual(rep.u.assoc_group_id, req.u.assoc_group_id) + sda_str = self.secondary_address + sda_len = len(sda_str) + 1 + mod_len = (2 + sda_len) % 4 + if mod_len != 0: + sda_pad = 4 - mod_len + else: + sda_pad = 0 + self.assertEqual(rep.u.secondary_address_size, sda_len) + self.assertEqual(rep.u.secondary_address, sda_str) + self.assertPadding(rep.u._pad1, sda_pad) + self.assertEqual(rep.u.num_results, 1) + self.assertEqual(rep.u.ctx_list[0].result, + dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE) + self.assertEqual(rep.u.ctx_list[0].reason, + dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED) + self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ndr32) + self.assertNotEqual(len(rep.u.auth_info), 0) + a = self.parse_auth(rep.u.auth_info) + + from_server = a.credentials + (finished, to_server) = g.update(from_server) + + auth_info = self.generate_auth(auth_type=auth_type, + auth_level=auth_level, + auth_context_id=auth_context_id, + auth_blob=to_server) + if expect_3legs: + self.assertTrue(finished) + else: + self.assertFalse(finished) + if not use_auth3: + req = self.generate_alter(call_id=2, + ctx_list=ctx_list, + assoc_group_id=rep.u.assoc_group_id, + auth_info=auth_info) + req_pdu = self._add_auth_padding(req, + mid_padding=third_mid_padding, + tail_padding=third_tail_padding, + auth_offset_diff=third_auth_offset_diff, + max_auth_length=third_max_auth_length, + max_xmit_frag=0xffff) + auth_length = self._get_pdu_auth_len(req_pdu) + if third_auth_offset_diff != 0: + third_fault_status = dcerpc.DCERPC_NCA_S_PROTO_ERROR + third_fault_flags = dcerpc.DCERPC_PFC_FLAG_DID_NOT_EXECUTE + if auth_length > max_auth_length: + third_fault_status = dcerpc.DCERPC_FAULT_ACCESS_DENIED + third_fault_flags = dcerpc.DCERPC_PFC_FLAG_DID_NOT_EXECUTE + self.send_pdu_blob(req_pdu) + rep = self.recv_pdu() + if third_fault_status is not None: + self.verify_pdu(rep, dcerpc.DCERPC_PKT_FAULT, req.call_id, + pfc_flags=req.pfc_flags | third_fault_flags, + auth_length=0) + self.assertNotEqual(rep.u.alloc_hint, 0) + self.assertEqual(rep.u.context_id, 0) + self.assertEqual(rep.u.cancel_count, 0) + self.assertEqual(rep.u.flags, 0) + self.assertEqual(rep.u.status, third_fault_status) + self.assertEqual(rep.u.reserved, 0) + self.assertEqual(len(rep.u.error_and_verifier), 0) + + # wait for a disconnect + rep = self.recv_pdu() + self.assertIsNone(rep) + self.assertNotConnected() + return + self.verify_pdu(rep, dcerpc.DCERPC_PKT_ALTER_RESP, req.call_id, + pfc_flags=req.pfc_flags) + self.assertEqual(rep.u.max_xmit_frag, req.u.max_xmit_frag) + self.assertEqual(rep.u.max_recv_frag, req.u.max_recv_frag) + self.assertEqual(rep.u.assoc_group_id, req.u.assoc_group_id) + self.assertEqual(rep.u.secondary_address_size, 0) + self.assertEqual(rep.u.secondary_address, "") + self.assertPadding(rep.u._pad1, 2) + self.assertEqual(rep.u.num_results, 1) + self.assertEqual(rep.u.ctx_list[0].result, + dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE) + self.assertEqual(rep.u.ctx_list[0].reason, + dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED) + self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ndr32) + if finished: + self.assertEqual(rep.auth_length, 0) + else: + self.assertNotEqual(rep.auth_length, 0) + self.assertGreater(len(rep.u.auth_info), dcerpc.DCERPC_AUTH_TRAILER_LENGTH) + self.assertEqual(rep.auth_length, len(rep.u.auth_info) - dcerpc.DCERPC_AUTH_TRAILER_LENGTH) + else: + req = self.generate_auth3(call_id=2, + auth_info=auth_info) + req_pdu = self._add_auth_padding(req, + mid_padding=third_mid_padding, + tail_padding=third_tail_padding, + auth_offset_diff=third_auth_offset_diff, + max_auth_length=third_max_auth_length) + auth_length = self._get_pdu_auth_len(req_pdu) + if third_auth_offset_diff != 0: + third_fault_status = dcerpc.DCERPC_NCA_S_PROTO_ERROR + third_fault_flags = dcerpc.DCERPC_PFC_FLAG_DID_NOT_EXECUTE + elif not expect_3legs: + pass + elif auth_length > max_auth_length: + first_req_fault_status = dcerpc.DCERPC_FAULT_ACCESS_DENIED + self.send_pdu_blob(req_pdu) + if third_fault_status is not None: + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_FAULT, req.call_id, + pfc_flags=req.pfc_flags | third_fault_flags, + auth_length=0) + self.assertNotEqual(rep.u.alloc_hint, 0) + self.assertEqual(rep.u.context_id, 0) + self.assertEqual(rep.u.cancel_count, 0) + self.assertEqual(rep.u.flags, 0) + self.assertEqual(rep.u.status, third_fault_status) + self.assertEqual(rep.u.reserved, 0) + self.assertEqual(len(rep.u.error_and_verifier), 0) + + # wait for a disconnect + rep = self.recv_pdu() + self.assertIsNone(rep) + self.assertNotConnected() + return + if self.transport_impersonation is None: + rep = self.recv_pdu(timeout=0.01) + self.assertIsNone(rep) + self.assertIsConnected() + + # And now try a request without auth_info + req = self.generate_request(call_id=3, + context_id=ctx1.context_id, + opnum=0, + stub=b"") + self.send_pdu(req) + rep = self.recv_pdu() + if first_req_fault_status is not None: + self.verify_pdu(rep, dcerpc.DCERPC_PKT_FAULT, req.call_id, + pfc_flags=req.pfc_flags | first_req_fault_flags, + auth_length=0) + self.assertNotEqual(rep.u.alloc_hint, 0) + self.assertEqual(rep.u.context_id, req.u.context_id) + self.assertEqual(rep.u.cancel_count, 0) + self.assertEqual(rep.u.flags, 0) + self.assertEqual(rep.u.status, first_req_fault_status) + self.assertEqual(rep.u.reserved, 0) + self.assertEqual(len(rep.u.error_and_verifier), 0) + + if first_req_fault_flags & dcerpc.DCERPC_PFC_FLAG_DID_NOT_EXECUTE: + return + + # wait for a disconnect + rep = self.recv_pdu() + self.assertIsNone(rep) + self.assertNotConnected() + return + self.verify_pdu(rep, dcerpc.DCERPC_PKT_RESPONSE, req.call_id, + auth_length=0) + self.assertNotEqual(rep.u.alloc_hint, 0) + self.assertEqual(rep.u.context_id, req.u.context_id) + self.assertEqual(rep.u.cancel_count, 0) + self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint) + + # Now a request with auth_info DCERPC_AUTH_LEVEL_CONNECT + auth_info = self.generate_auth(auth_type=auth_type, + auth_level=auth_level, + auth_context_id=auth_context_id, + auth_blob=b"\x01" + b"\x00" * 15) + req = self.generate_request(call_id=4, + context_id=ctx1.context_id, + opnum=0, + stub=b"", + auth_info=auth_info) + self.send_pdu(req) + rep = self.recv_pdu() + # We don't get an auth_info back + self.verify_pdu(rep, dcerpc.DCERPC_PKT_RESPONSE, req.call_id, + auth_length=0) + self.assertNotEqual(rep.u.alloc_hint, 0) + self.assertEqual(rep.u.context_id, req.u.context_id) + self.assertEqual(rep.u.cancel_count, 0) + self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint) + + self._disconnect("disconnect") + self.assertNotConnected() + + def _test_auth_mid_pad_with_args(self, + auth_type, + use_auth3, + use_smb): + return self._test_auth_pad_raw(auth_type=auth_type, + use_auth3=use_auth3, + use_smb=use_smb, + bind_mid_padding=True, + third_mid_padding=True) + + def _test_auth_full_pad_with_args(self, + auth_type, + use_auth3, + use_smb): + return self._test_auth_pad_raw(auth_type=auth_type, + use_auth3=use_auth3, + use_smb=use_smb, + bind_mid_padding=True, + third_mid_padding=True, + bind_tail_padding=True, + third_tail_padding=True) + + def _test_auth_tail_pad_with_args(self, + auth_type, + use_auth3, + use_smb): + return self._test_auth_pad_raw(auth_type=auth_type, + use_auth3=use_auth3, + use_smb=use_smb, + bind_tail_padding=True, + third_tail_padding=True) + + def _test_auth_pad_bind_align2_with_args(self, + auth_type, + use_smb): + return self._test_auth_pad_raw(auth_type=auth_type, + use_smb=use_smb, + bind_auth_offset_diff=2) + + def _test_auth_pad_alter_align2_with_args(self, + auth_type, + use_smb): + return self._test_auth_pad_raw(auth_type=auth_type, + use_smb=use_smb, + third_auth_offset_diff=2) + + def _test_auth_pad_auth3_align2_with_args(self, + auth_type, + use_smb): + return self._test_auth_pad_raw(auth_type=auth_type, + use_smb=use_smb, + use_auth3=True, + third_auth_offset_diff=2) + + def _test_auth_pad_ntlm_2888_with_args(self, + use_auth3, + use_smb): + auth_type = dcerpc.DCERPC_AUTH_TYPE_NTLMSSP + return self._test_auth_pad_raw(auth_type=auth_type, + use_smb=use_smb, + use_auth3=use_auth3, + bind_tail_padding=True, + bind_max_auth_length=2888, + third_tail_padding=True, + third_max_auth_length=2888) + + def _test_auth_pad_ntlm_2889_with_args(self, + use_auth3, + use_smb): + auth_type = dcerpc.DCERPC_AUTH_TYPE_NTLMSSP + return self._test_auth_pad_raw(auth_type=auth_type, + use_smb=use_smb, + use_auth3=use_auth3, + third_tail_padding=True, + third_max_auth_length=2889) + + def _test_auth_pad_ntlm_2889_bind_with_args(self, + use_smb): + auth_type = dcerpc.DCERPC_AUTH_TYPE_NTLMSSP + return self._test_auth_pad_raw(auth_type=auth_type, + use_smb=use_smb, + bind_tail_padding=True, + bind_max_auth_length=2889) + + @classmethod + def _setup_auth_pad_ignored(cls): + auth_types = { + "spnego": dcerpc.DCERPC_AUTH_TYPE_SPNEGO, + "ntlm": dcerpc.DCERPC_AUTH_TYPE_NTLMSSP, + } + third_methods = { + "alter": False, + "auth3": True, + } + transports = { + "tcp": False, + "smb": True, + } + + for auth_type in auth_types.keys(): + for third_method in third_methods.keys(): + for transport in transports.keys(): + tname = "%s_%s_%s" % ( + auth_type, + third_method, + transport + ) + targs = ( + auth_types[auth_type], + third_methods[third_method], + transports[transport], + ) + cls.generate_dynamic_test("test_auth_mid_pad", + tname, *targs) + cls.generate_dynamic_test("test_auth_full_pad", + tname, *targs) + cls.generate_dynamic_test("test_auth_tail_pad", + tname, *targs) + + for auth_type in auth_types.keys(): + for transport in transports.keys(): + tname = "%s_%s" % ( + auth_type, + transport + ) + targs = ( + auth_types[auth_type], + transports[transport], + ) + cls.generate_dynamic_test("test_auth_pad_bind_align2", + tname, *targs) + cls.generate_dynamic_test("test_auth_pad_alter_align2", + tname, *targs) + cls.generate_dynamic_test("test_auth_pad_auth3_align2", + tname, *targs) + + for third_method in third_methods.keys(): + for transport in transports.keys(): + tname = "%s_%s" % ( + third_method, + transport + ) + targs = ( + third_methods[third_method], + transports[transport], + ) + cls.generate_dynamic_test("test_auth_pad_ntlm_2888", + tname, *targs) + cls.generate_dynamic_test("test_auth_pad_ntlm_2889", + tname, *targs) + + for transport in transports.keys(): + tname = "%s" % ( + transport + ) + targs = ( + transports[transport], + ) + cls.generate_dynamic_test("test_auth_pad_ntlm_2889_bind", + tname, *targs) + + return + def test_spnego_auth_pad_ok_bind_legacy(self): ndr32 = base.transfer_syntax_ndr() diff --git a/selftest/knownfail.d/dcerpc-auth-pad b/selftest/knownfail.d/dcerpc-auth-pad new file mode 100644 index 00000000000..f1daffa3771 --- /dev/null +++ b/selftest/knownfail.d/dcerpc-auth-pad @@ -0,0 +1,19 @@ +^samba.tests.dcerpc.raw_protocol.samba.tests.dcerpc.raw_protocol.TestDCERPC_BIND.test_auth_full_pad_ntlm_alter +^samba.tests.dcerpc.raw_protocol.samba.tests.dcerpc.raw_protocol.TestDCERPC_BIND.test_auth_full_pad_ntlm_auth3 +^samba.tests.dcerpc.raw_protocol.samba.tests.dcerpc.raw_protocol.TestDCERPC_BIND.test_auth_full_pad_spnego_alter +^samba.tests.dcerpc.raw_protocol.samba.tests.dcerpc.raw_protocol.TestDCERPC_BIND.test_auth_full_pad_spnego_auth3 +^samba.tests.dcerpc.raw_protocol.samba.tests.dcerpc.raw_protocol.TestDCERPC_BIND.test_auth_mid_pad_ntlm_alter +^samba.tests.dcerpc.raw_protocol.samba.tests.dcerpc.raw_protocol.TestDCERPC_BIND.test_auth_mid_pad_ntlm_auth3 +^samba.tests.dcerpc.raw_protocol.samba.tests.dcerpc.raw_protocol.TestDCERPC_BIND.test_auth_mid_pad_spnego_alter +^samba.tests.dcerpc.raw_protocol.samba.tests.dcerpc.raw_protocol.TestDCERPC_BIND.test_auth_mid_pad_spnego_auth3 +^samba.tests.dcerpc.raw_protocol.samba.tests.dcerpc.raw_protocol.TestDCERPC_BIND.test_auth_pad_bind_align2_ntlm +^samba.tests.dcerpc.raw_protocol.samba.tests.dcerpc.raw_protocol.TestDCERPC_BIND.test_auth_pad_bind_align2_spnego +^samba.tests.dcerpc.raw_protocol.samba.tests.dcerpc.raw_protocol.TestDCERPC_BIND.test_auth_pad_auth3_align2_ntlm +^samba.tests.dcerpc.raw_protocol.samba.tests.dcerpc.raw_protocol.TestDCERPC_BIND.test_auth_pad_auth3_align2_spnego +^samba.tests.dcerpc.raw_protocol.samba.tests.dcerpc.raw_protocol.TestDCERPC_BIND.test_auth_pad_ntlm_2889_alter +^samba.tests.dcerpc.raw_protocol.samba.tests.dcerpc.raw_protocol.TestDCERPC_BIND.test_auth_pad_ntlm_2889_auth3 +^samba.tests.dcerpc.raw_protocol.samba.tests.dcerpc.raw_protocol.TestDCERPC_BIND.test_auth_pad_ntlm_2889_bind +^samba.tests.dcerpc.raw_protocol.samba.tests.dcerpc.raw_protocol.TestDCERPC_BIND.test_auth_tail_pad_ntlm_alter +^samba.tests.dcerpc.raw_protocol.samba.tests.dcerpc.raw_protocol.TestDCERPC_BIND.test_auth_tail_pad_ntlm_auth3 +^samba.tests.dcerpc.raw_protocol.samba.tests.dcerpc.raw_protocol.TestDCERPC_BIND.test_auth_tail_pad_spnego_alter +^samba.tests.dcerpc.raw_protocol.samba.tests.dcerpc.raw_protocol.TestDCERPC_BIND.test_auth_tail_pad_spnego_auth3