diff --git a/python/samba/tests/dcerpc/raw_protocol.py b/python/samba/tests/dcerpc/raw_protocol.py index d5cd10d724a..d10f767371b 100755 --- a/python/samba/tests/dcerpc/raw_protocol.py +++ b/python/samba/tests/dcerpc/raw_protocol.py @@ -5170,7 +5170,7 @@ class TestDCERPC_BIND(RawDCERPCTest): self.assertIsNone(rep) self.assertNotConnected() - def test_spnego_auth_pad_ok(self): + def test_spnego_auth_pad_ok_bind_legacy(self): ndr32 = base.transfer_syntax_ndr() tsf1_list = [ndr32] @@ -5335,7 +5335,171 @@ class TestDCERPC_BIND(RawDCERPCTest): self._disconnect("disconnect") self.assertNotConnected() - def test_spnego_auth_pad_fail_bind(self): + def test_spnego_auth_pad_ok_alter_legacy(self): + ndr32 = base.transfer_syntax_ndr() + + tsf1_list = [ndr32] + ctx1 = dcerpc.ctx_list() + ctx1.context_id = 1 + ctx1.num_transfer_syntaxes = len(tsf1_list) + ctx1.abstract_syntax = samba.dcerpc.mgmt.abstract_syntax() + ctx1.transfer_syntaxes = tsf1_list + ctx_list = [ctx1] + + c = self.get_anon_creds() + g = gensec.Security.start_client(self.settings) + g.set_credentials(c) + g.want_feature(gensec.FEATURE_DCE_STYLE) + auth_type = dcerpc.DCERPC_AUTH_TYPE_SPNEGO + auth_level = dcerpc.DCERPC_AUTH_LEVEL_CONNECT + auth_context_id = 2 + g.start_mech_by_authtype(auth_type, auth_level) + from_server = b"" + (finished, to_server) = g.update(from_server) + self.assertFalse(finished) + + auth_info = self.generate_auth(auth_type=auth_type, + auth_level=auth_level, + auth_context_id=auth_context_id, + auth_blob=to_server) + + req = self.generate_bind(call_id=0, + ctx_list=ctx_list, + auth_info=auth_info) + req_pdu = samba.ndr.ndr_pack(req) + + auth_info = self.generate_auth(auth_type=auth_type, + auth_level=auth_level, + auth_context_id=auth_context_id, + auth_pad_length=0, + auth_blob=to_server) + + req = self.generate_bind(call_id=0, + ctx_list=ctx_list, + auth_info=auth_info) + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_ACK, req.call_id) + self.assertEqual(rep.u.max_xmit_frag, req.u.max_xmit_frag) + self.assertEqual(rep.u.max_recv_frag, req.u.max_recv_frag) + self.assertNotEqual(rep.u.assoc_group_id, req.u.assoc_group_id) + self.assertEqual(rep.u.secondary_address_size, 4) + self.assertEqual(rep.u.secondary_address, "%d" % self.tcp_port) + self.assertPadding(rep.u._pad1, 2) + self.assertEqual(rep.u.num_results, 1) + self.assertEqual(rep.u.ctx_list[0].result, + dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE) + self.assertEqual(rep.u.ctx_list[0].reason, + dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED) + self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ndr32) + self.assertNotEqual(len(rep.u.auth_info), 0) + a = self.parse_auth(rep.u.auth_info) + + from_server = a.credentials + (finished, to_server) = g.update(from_server) + self.assertFalse(finished) + + auth_info = self.generate_auth(auth_type=auth_type, + auth_level=auth_level, + auth_context_id=auth_context_id, + auth_blob=to_server) + req = self.generate_alter(call_id=0, + ctx_list=ctx_list, + assoc_group_id=rep.u.assoc_group_id, + auth_info=auth_info) + req_pdu = samba.ndr.ndr_pack(req) + + auth_pad_ok = len(req_pdu) + auth_pad_ok -= dcerpc.DCERPC_REQUEST_LENGTH + auth_pad_ok -= dcerpc.DCERPC_AUTH_TRAILER_LENGTH + auth_pad_ok -= len(to_server) + auth_info = self.generate_auth(auth_type=auth_type, + auth_level=auth_level, + auth_context_id=auth_context_id, + auth_pad_length=auth_pad_ok, + auth_blob=to_server) + req = self.generate_alter(call_id=0, + ctx_list=ctx_list, + assoc_group_id=rep.u.assoc_group_id, + auth_info=auth_info) + self.send_pdu(req) + rep = self.recv_pdu() + if not self.allow_bind_auth_pad: + # modern server (e.g. 2022) + self.verify_pdu(rep, dcerpc.DCERPC_PKT_FAULT, req.call_id, + pfc_flags=req.pfc_flags | + dcerpc.DCERPC_PFC_FLAG_DID_NOT_EXECUTE, + auth_length=0) + self.assertNotEqual(rep.u.alloc_hint, 0) + self.assertEqual(rep.u.context_id, 0) + self.assertEqual(rep.u.cancel_count, 0) + self.assertEqual(rep.u.flags, 0) + self.assertEqual(rep.u.status, dcerpc.DCERPC_NCA_S_PROTO_ERROR) + self.assertEqual(rep.u.reserved, 0) + self.assertEqual(len(rep.u.error_and_verifier), 0) + + # wait for a disconnect + rep = self.recv_pdu() + self.assertIsNone(rep) + self.assertNotConnected() + return + self.verify_pdu(rep, dcerpc.DCERPC_PKT_ALTER_RESP, req.call_id) + self.assertEqual(rep.u.max_xmit_frag, req.u.max_xmit_frag) + self.assertEqual(rep.u.max_recv_frag, req.u.max_recv_frag) + self.assertEqual(rep.u.assoc_group_id, req.u.assoc_group_id) + self.assertEqual(rep.u.secondary_address_size, 0) + self.assertPadding(rep.u._pad1, 2) + self.assertEqual(rep.u.num_results, 1) + self.assertEqual(rep.u.ctx_list[0].result, + dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE) + self.assertEqual(rep.u.ctx_list[0].reason, + dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED) + self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ndr32) + self.assertNotEqual(len(rep.u.auth_info), 0) + a = self.parse_auth(rep.u.auth_info) + + from_server = a.credentials + (finished, to_server) = g.update(from_server) + self.assertTrue(finished) + + # And now try a request without auth_info + req = self.generate_request(call_id=2, + context_id=ctx1.context_id, + opnum=0, + stub=b"") + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_RESPONSE, req.call_id, + auth_length=0) + self.assertNotEqual(rep.u.alloc_hint, 0) + self.assertEqual(rep.u.context_id, req.u.context_id) + self.assertEqual(rep.u.cancel_count, 0) + self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint) + + # Now a request with auth_info DCERPC_AUTH_LEVEL_CONNECT + auth_info = self.generate_auth(auth_type=auth_type, + auth_level=auth_level, + auth_context_id=auth_context_id, + auth_blob=b"\x01" + b"\x00" * 15) + req = self.generate_request(call_id=3, + context_id=ctx1.context_id, + opnum=0, + stub=b"", + auth_info=auth_info) + self.send_pdu(req) + rep = self.recv_pdu() + # We don't get an auth_info back + self.verify_pdu(rep, dcerpc.DCERPC_PKT_RESPONSE, req.call_id, + auth_length=0) + self.assertNotEqual(rep.u.alloc_hint, 0) + self.assertEqual(rep.u.context_id, req.u.context_id) + self.assertEqual(rep.u.cancel_count, 0) + self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint) + + self._disconnect("disconnect") + self.assertNotConnected() + + def test_spnego_auth_pad_fail_bind_legacy(self): ndr32 = base.transfer_syntax_ndr() tsf1_list = [ndr32] @@ -5526,7 +5690,7 @@ class TestDCERPC_BIND(RawDCERPCTest): self.assertIsNone(rep) self.assertNotConnected() - def test_ntlmssp_auth_pad_ok(self): + def test_ntlmssp_auth_pad_ok_auth3(self): ndr32 = base.transfer_syntax_ndr() tsf1_list = [ndr32] @@ -5693,6 +5857,208 @@ class TestDCERPC_BIND(RawDCERPCTest): auth_info=auth_info) req_pdu = samba.ndr.ndr_pack(req) + auth_info = self.generate_auth(auth_type=auth_type, + auth_level=auth_level, + auth_context_id=auth_context_id, + auth_pad_length=0, + auth_blob=to_server) + + req = self.generate_bind(call_id=0, + ctx_list=ctx_list, + auth_info=auth_info) + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_ACK, req.call_id) + self.assertEqual(rep.u.max_xmit_frag, req.u.max_xmit_frag) + self.assertEqual(rep.u.max_recv_frag, req.u.max_recv_frag) + self.assertNotEqual(rep.u.assoc_group_id, req.u.assoc_group_id) + self.assertEqual(rep.u.secondary_address_size, 4) + self.assertEqual(rep.u.secondary_address, "%d" % self.tcp_port) + self.assertPadding(rep.u._pad1, 2) + self.assertEqual(rep.u.num_results, 1) + self.assertEqual(rep.u.ctx_list[0].result, + dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE) + self.assertEqual(rep.u.ctx_list[0].reason, + dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED) + self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ndr32) + self.assertNotEqual(len(rep.u.auth_info), 0) + a = self.parse_auth(rep.u.auth_info) + + from_server = a.credentials + (finished, to_server) = g.update(from_server) + self.assertTrue(finished) + + auth_pad_bad = 1 + auth_info = self.generate_auth(auth_type=auth_type, + auth_level=auth_level, + auth_context_id=auth_context_id, + auth_pad_length=auth_pad_bad, + auth_blob=to_server) + req = self.generate_auth3(call_id=0, + auth_info=auth_info) + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_FAULT, req.call_id, + pfc_flags=req.pfc_flags | + dcerpc.DCERPC_PFC_FLAG_DID_NOT_EXECUTE, + auth_length=0) + self.assertNotEqual(rep.u.alloc_hint, 0) + self.assertEqual(rep.u.context_id, 0) + self.assertEqual(rep.u.cancel_count, 0) + self.assertEqual(rep.u.flags, 0) + self.assertEqual(rep.u.status, dcerpc.DCERPC_NCA_S_FAULT_REMOTE_NO_MEMORY) + self.assertEqual(rep.u.reserved, 0) + self.assertEqual(len(rep.u.error_and_verifier), 0) + + # wait for a disconnect + rep = self.recv_pdu() + self.assertIsNone(rep) + self.assertNotConnected() + + def test_ntlmssp_auth_pad_fail_bind_legacy(self): + ndr32 = base.transfer_syntax_ndr() + + tsf1_list = [ndr32] + ctx1 = dcerpc.ctx_list() + ctx1.context_id = 1 + ctx1.num_transfer_syntaxes = len(tsf1_list) + ctx1.abstract_syntax = samba.dcerpc.mgmt.abstract_syntax() + ctx1.transfer_syntaxes = tsf1_list + ctx_list = [ctx1] + + c = self.get_anon_creds() + g = gensec.Security.start_client(self.settings) + g.set_credentials(c) + g.want_feature(gensec.FEATURE_DCE_STYLE) + auth_type = dcerpc.DCERPC_AUTH_TYPE_NTLMSSP + auth_level = dcerpc.DCERPC_AUTH_LEVEL_CONNECT + auth_context_id = 2 + g.start_mech_by_authtype(auth_type, auth_level) + from_server = b"" + (finished, to_server) = g.update(from_server) + self.assertFalse(finished) + + auth_info = self.generate_auth(auth_type=auth_type, + auth_level=auth_level, + auth_context_id=auth_context_id, + auth_blob=to_server) + + req = self.generate_bind(call_id=0, + ctx_list=ctx_list, + auth_info=auth_info) + req_pdu = samba.ndr.ndr_pack(req) + + auth_pad_ok = len(req_pdu) + auth_pad_ok -= dcerpc.DCERPC_REQUEST_LENGTH + auth_pad_ok -= dcerpc.DCERPC_AUTH_TRAILER_LENGTH + auth_pad_ok -= len(to_server) + + auth_info = self.generate_auth(auth_type=auth_type, + auth_level=auth_level, + auth_context_id=auth_context_id, + auth_pad_length=auth_pad_ok, + auth_blob=to_server) + + req = self.generate_bind(call_id=0, + ctx_list=ctx_list, + auth_info=auth_info) + self.send_pdu(req) + rep = self.recv_pdu() + if not self.allow_bind_auth_pad: + # modern server (e.g. 2022) + self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_NAK, req.call_id, + auth_length=0) + self.assertEqual(rep.u.reject_reason, + dcerpc.DCERPC_BIND_NAK_REASON_PROTOCOL_VERSION_NOT_SUPPORTED) + self.assertEqual(rep.u.num_versions, 1) + self.assertEqual(rep.u.versions[0].rpc_vers, req.rpc_vers) + self.assertEqual(rep.u.versions[0].rpc_vers_minor, req.rpc_vers_minor) + self.assertPadding(rep.u._pad, 3) + # wait for a disconnect + rep = self.recv_pdu() + self.assertIsNone(rep) + self.assertNotConnected() + return + self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_ACK, req.call_id) + self.assertEqual(rep.u.max_xmit_frag, req.u.max_xmit_frag) + self.assertEqual(rep.u.max_recv_frag, req.u.max_recv_frag) + self.assertNotEqual(rep.u.assoc_group_id, req.u.assoc_group_id) + self.assertEqual(rep.u.secondary_address_size, 4) + self.assertEqual(rep.u.secondary_address, "%d" % self.tcp_port) + self.assertPadding(rep.u._pad1, 2) + self.assertEqual(rep.u.num_results, 1) + self.assertEqual(rep.u.ctx_list[0].result, + dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE) + self.assertEqual(rep.u.ctx_list[0].reason, + dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED) + self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ndr32) + self.assertNotEqual(len(rep.u.auth_info), 0) + a = self.parse_auth(rep.u.auth_info) + + from_server = a.credentials + (finished, to_server) = g.update(from_server) + self.assertTrue(finished) + + auth_pad_bad = 1 + auth_info = self.generate_auth(auth_type=auth_type, + auth_level=auth_level, + auth_context_id=auth_context_id, + auth_pad_length=auth_pad_bad, + auth_blob=to_server) + req = self.generate_auth3(call_id=0, + auth_info=auth_info) + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_FAULT, req.call_id, + pfc_flags=req.pfc_flags | + dcerpc.DCERPC_PFC_FLAG_DID_NOT_EXECUTE, + auth_length=0) + self.assertNotEqual(rep.u.alloc_hint, 0) + self.assertEqual(rep.u.context_id, 0) + self.assertEqual(rep.u.cancel_count, 0) + self.assertEqual(rep.u.flags, 0) + self.assertEqual(rep.u.status, dcerpc.DCERPC_NCA_S_FAULT_REMOTE_NO_MEMORY) + self.assertEqual(rep.u.reserved, 0) + self.assertEqual(len(rep.u.error_and_verifier), 0) + + # wait for a disconnect + rep = self.recv_pdu() + self.assertIsNone(rep) + self.assertNotConnected() + + def test_ntlmssp_auth_pad_fail_auth3_lagacy(self): + ndr32 = base.transfer_syntax_ndr() + + tsf1_list = [ndr32] + ctx1 = dcerpc.ctx_list() + ctx1.context_id = 1 + ctx1.num_transfer_syntaxes = len(tsf1_list) + ctx1.abstract_syntax = samba.dcerpc.mgmt.abstract_syntax() + ctx1.transfer_syntaxes = tsf1_list + ctx_list = [ctx1] + + c = self.get_anon_creds() + g = gensec.Security.start_client(self.settings) + g.set_credentials(c) + g.want_feature(gensec.FEATURE_DCE_STYLE) + auth_type = dcerpc.DCERPC_AUTH_TYPE_NTLMSSP + auth_level = dcerpc.DCERPC_AUTH_LEVEL_CONNECT + auth_context_id = 2 + g.start_mech_by_authtype(auth_type, auth_level) + from_server = b"" + (finished, to_server) = g.update(from_server) + self.assertFalse(finished) + + auth_info = self.generate_auth(auth_type=auth_type, + auth_level=auth_level, + auth_context_id=auth_context_id, + auth_blob=to_server) + + req = self.generate_bind(call_id=0, + ctx_list=ctx_list, + auth_info=auth_info) + req_pdu = samba.ndr.ndr_pack(req) + auth_pad_ok = len(req_pdu) auth_pad_ok -= dcerpc.DCERPC_REQUEST_LENGTH auth_pad_ok -= dcerpc.DCERPC_AUTH_TRAILER_LENGTH