diff --git a/python/samba/tests/dcerpc/raw_protocol.py b/python/samba/tests/dcerpc/raw_protocol.py index 77a5fd5967b..3c1d4adb3bc 100755 --- a/python/samba/tests/dcerpc/raw_protocol.py +++ b/python/samba/tests/dcerpc/raw_protocol.py @@ -24,6 +24,7 @@ os.environ["PYTHONUNBUFFERED"] = "1" import samba.dcerpc.dcerpc as dcerpc import samba.dcerpc.base as base +import samba.dcerpc.misc as misc import samba.dcerpc.epmapper import samba.dcerpc.mgmt import samba.dcerpc.netlogon @@ -460,15 +461,6 @@ class TestDCERPC_BIND(RawDCERPCTest): self.assertEquals(len(rep.u._pad), 3) self.assertEquals(rep.u._pad, '\0' * 3) - ndr32 = base.transfer_syntax_ndr() - - tsf1_list = [ndr32] - ctx1 = dcerpc.ctx_list() - ctx1.context_id = 1 - ctx1.num_transfer_syntaxes = len(tsf1_list) - ctx1.abstract_syntax = samba.dcerpc.mgmt.abstract_syntax() - ctx1.transfer_syntaxes = tsf1_list - # wait for a disconnect rep = self.recv_pdu() self.assertIsNone(rep) @@ -525,6 +517,785 @@ class TestDCERPC_BIND(RawDCERPCTest): self.assertIsNone(rep) self.assertNotConnected() + def test_no_auth_presentation_ctx_valid1(self): + ndr32 = base.transfer_syntax_ndr() + + zero_syntax = misc.ndr_syntax_id() + + tsf1_list = [zero_syntax, ndr32] + ctx1 = dcerpc.ctx_list() + ctx1.context_id = 1 + ctx1.num_transfer_syntaxes = len(tsf1_list) + ctx1.abstract_syntax = samba.dcerpc.mgmt.abstract_syntax() + ctx1.transfer_syntaxes = tsf1_list + + req = self.generate_bind(call_id=0, ctx_list=[ctx1]) + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_ACK, req.call_id, + auth_length=0) + self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag) + self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag) + self.assertNotEquals(rep.u.assoc_group_id, req.u.assoc_group_id) + self.assertEquals(rep.u.secondary_address_size, 4) + self.assertEquals(rep.u.secondary_address, "%d" % self.tcp_port) + self.assertEquals(len(rep.u._pad1), 2) + self.assertEquals(rep.u._pad1, '\0' * 2) + self.assertEquals(rep.u.num_results, 1) + self.assertEquals(rep.u.ctx_list[0].result, + dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE) + self.assertEquals(rep.u.ctx_list[0].reason, + dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED) + self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ndr32) + self.assertEquals(rep.u.auth_info, '\0' * 0) + + # Send a alter + req = self.generate_alter(call_id=1, ctx_list=[ctx1]) + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_ALTER_RESP, req.call_id, + auth_length=0) + self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag) + self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag) + self.assertNotEquals(rep.u.assoc_group_id, req.u.assoc_group_id) + self.assertEquals(rep.u.secondary_address_size, 0) + self.assertEquals(len(rep.u._pad1), 2) + #self.assertEquals(rep.u._pad1, '\0' * 2) + self.assertEquals(rep.u.num_results, 1) + self.assertEquals(rep.u.ctx_list[0].result, + dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE) + self.assertEquals(rep.u.ctx_list[0].reason, + dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED) + self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ndr32) + self.assertEquals(rep.u.auth_info, '\0' * 0) + + req = self.generate_request(call_id = 2, + context_id=ctx1.context_id, + opnum=0xffff, + stub="") + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_FAULT, req.call_id, + pfc_flags=req.pfc_flags | + dcerpc.DCERPC_PFC_FLAG_DID_NOT_EXECUTE, + auth_length=0) + self.assertNotEquals(rep.u.alloc_hint, 0) + self.assertEquals(rep.u.context_id, ctx1.context_id) + self.assertEquals(rep.u.cancel_count, 0) + self.assertEquals(rep.u.flags, 0) + self.assertEquals(rep.u.status, dcerpc.DCERPC_NCA_S_OP_RNG_ERROR) + self.assertEquals(rep.u.reserved, 0) + self.assertEquals(len(rep.u.error_and_verifier), 0) + + def test_no_auth_presentation_ctx_invalid1(self): + ndr32 = base.transfer_syntax_ndr() + + zero_syntax = misc.ndr_syntax_id() + + tsf1_list = [ndr32] + ctx1 = dcerpc.ctx_list() + ctx1.context_id = 1 + ctx1.num_transfer_syntaxes = len(tsf1_list) + ctx1.abstract_syntax = ndr32 + ctx1.transfer_syntaxes = tsf1_list + + req = self.generate_bind(call_id=0, ctx_list=[ctx1]) + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_ACK, req.call_id, + auth_length=0) + self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag) + self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag) + self.assertNotEquals(rep.u.assoc_group_id, req.u.assoc_group_id) + self.assertEquals(rep.u.secondary_address_size, 4) + self.assertEquals(rep.u.secondary_address, "%d" % self.tcp_port) + self.assertEquals(len(rep.u._pad1), 2) + self.assertEquals(rep.u._pad1, '\0' * 2) + self.assertEquals(rep.u.num_results, 1) + self.assertEquals(rep.u.ctx_list[0].result, + dcerpc.DCERPC_BIND_ACK_RESULT_PROVIDER_REJECTION) + self.assertEquals(rep.u.ctx_list[0].reason, + dcerpc.DCERPC_BIND_ACK_REASON_ABSTRACT_SYNTAX_NOT_SUPPORTED) + self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, zero_syntax) + self.assertEquals(rep.u.auth_info, '\0' * 0) + + # Send a alter + req = self.generate_alter(call_id=1, ctx_list=[ctx1]) + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_ALTER_RESP, req.call_id, + auth_length=0) + self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag) + self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag) + self.assertNotEquals(rep.u.assoc_group_id, req.u.assoc_group_id) + self.assertEquals(rep.u.secondary_address_size, 0) + self.assertEquals(len(rep.u._pad1), 2) + #self.assertEquals(rep.u._pad1, '\0' * 2) + self.assertEquals(rep.u.num_results, 1) + self.assertEquals(rep.u.ctx_list[0].result, + dcerpc.DCERPC_BIND_ACK_RESULT_PROVIDER_REJECTION) + self.assertEquals(rep.u.ctx_list[0].reason, + dcerpc.DCERPC_BIND_ACK_REASON_ABSTRACT_SYNTAX_NOT_SUPPORTED) + self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, zero_syntax) + self.assertEquals(rep.u.auth_info, '\0' * 0) + + req = self.generate_request(call_id = 2, + context_id=12345, + opnum=0, + stub="") + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_FAULT, req.call_id, + pfc_flags=req.pfc_flags | + dcerpc.DCERPC_PFC_FLAG_DID_NOT_EXECUTE, + auth_length=0) + self.assertNotEquals(rep.u.alloc_hint, 0) + self.assertEquals(rep.u.context_id, 0) + self.assertEquals(rep.u.cancel_count, 0) + self.assertEquals(rep.u.flags, 0) + self.assertEquals(rep.u.status, dcerpc.DCERPC_NCA_S_UNKNOWN_IF) + self.assertEquals(rep.u.reserved, 0) + self.assertEquals(len(rep.u.error_and_verifier), 0) + + # Send a alter again to prove the connection is still alive + req = self.generate_alter(call_id=3, ctx_list=[ctx1]) + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_ALTER_RESP, req.call_id, + auth_length=0) + self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag) + self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag) + self.assertNotEquals(rep.u.assoc_group_id, req.u.assoc_group_id) + self.assertEquals(rep.u.secondary_address_size, 0) + self.assertEquals(len(rep.u._pad1), 2) + #self.assertEquals(rep.u._pad1, '\0' * 2) + self.assertEquals(rep.u.num_results, 1) + self.assertEquals(rep.u.ctx_list[0].result, + dcerpc.DCERPC_BIND_ACK_RESULT_PROVIDER_REJECTION) + self.assertEquals(rep.u.ctx_list[0].reason, + dcerpc.DCERPC_BIND_ACK_REASON_ABSTRACT_SYNTAX_NOT_SUPPORTED) + self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, zero_syntax) + self.assertEquals(rep.u.auth_info, '\0' * 0) + + def test_no_auth_presentation_ctx_invalid2(self): + ndr32 = base.transfer_syntax_ndr() + + zero_syntax = misc.ndr_syntax_id() + + tsf1a_list = [] + ctx1a = dcerpc.ctx_list() + ctx1a.context_id = 1 + ctx1a.num_transfer_syntaxes = len(tsf1a_list) + ctx1a.abstract_syntax = samba.dcerpc.mgmt.abstract_syntax() + ctx1a.transfer_syntaxes = tsf1a_list + + req = self.generate_bind(call_id=0, ctx_list=[ctx1a]) + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_NAK, req.call_id, + auth_length=0) + self.assertEquals(rep.u.reject_reason, + dcerpc.DCERPC_BIND_NAK_REASON_NOT_SPECIFIED) + self.assertEquals(rep.u.num_versions, 1) + self.assertEquals(rep.u.versions[0].rpc_vers, req.rpc_vers) + self.assertEquals(rep.u.versions[0].rpc_vers_minor, req.rpc_vers_minor) + self.assertEquals(len(rep.u._pad), 3) + self.assertEquals(rep.u._pad, '\0' * 3) + + # wait for a disconnect + rep = self.recv_pdu() + self.assertIsNone(rep) + self.assertNotConnected() + + def test_no_auth_presentation_ctx_invalid3(self): + ndr32 = base.transfer_syntax_ndr() + + zero_syntax = misc.ndr_syntax_id() + + tsf1a_list = [zero_syntax, ndr32, ndr32, ndr32] + ctx1a = dcerpc.ctx_list() + ctx1a.context_id = 1 + ctx1a.num_transfer_syntaxes = len(tsf1a_list) + ctx1a.abstract_syntax = samba.dcerpc.mgmt.abstract_syntax() + ctx1a.transfer_syntaxes = tsf1a_list + + req = self.generate_bind(call_id=0, ctx_list=[ctx1a]) + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_ACK, req.call_id, + auth_length=0) + self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag) + self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag) + self.assertNotEquals(rep.u.assoc_group_id, req.u.assoc_group_id) + self.assertEquals(rep.u.secondary_address_size, 4) + self.assertEquals(rep.u.secondary_address, "%d" % self.tcp_port) + self.assertEquals(len(rep.u._pad1), 2) + self.assertEquals(rep.u._pad1, '\0' * 2) + self.assertEquals(rep.u.num_results, 1) + self.assertEquals(rep.u.ctx_list[0].result, + dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE) + self.assertEquals(rep.u.ctx_list[0].reason, + dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED) + self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ndr32) + self.assertEquals(rep.u.auth_info, '\0' * 0) + + tsf1b_list = [] + ctx1b = dcerpc.ctx_list() + ctx1b.context_id = 1 + ctx1b.num_transfer_syntaxes = len(tsf1b_list) + ctx1b.abstract_syntax = samba.dcerpc.mgmt.abstract_syntax() + ctx1b.transfer_syntaxes = tsf1b_list + + # Send a alter + req = self.generate_alter(call_id=1, ctx_list=[ctx1b]) + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_FAULT, req.call_id, + pfc_flags=req.pfc_flags | + dcerpc.DCERPC_PFC_FLAG_DID_NOT_EXECUTE, + auth_length=0) + self.assertNotEquals(rep.u.alloc_hint, 0) + self.assertEquals(rep.u.context_id, 0) + self.assertEquals(rep.u.cancel_count, 0) + self.assertEquals(rep.u.flags, 0) + self.assertEquals(rep.u.status, dcerpc.DCERPC_NCA_S_PROTO_ERROR) + self.assertEquals(rep.u.reserved, 0) + self.assertEquals(len(rep.u.error_and_verifier), 0) + + # wait for a disconnect + rep = self.recv_pdu() + self.assertIsNone(rep) + self.assertNotConnected() + + def test_no_auth_presentation_ctx_invalid4(self): + ndr32 = base.transfer_syntax_ndr() + ndr64 = base.transfer_syntax_ndr64() + + zero_syntax = misc.ndr_syntax_id() + + tsf1a_list = [zero_syntax, ndr32, ndr32, ndr32] + ctx1a = dcerpc.ctx_list() + ctx1a.context_id = 1 + ctx1a.num_transfer_syntaxes = len(tsf1a_list) + ctx1a.abstract_syntax = samba.dcerpc.mgmt.abstract_syntax() + ctx1a.transfer_syntaxes = tsf1a_list + + req = self.generate_bind(call_id=0, ctx_list=[ctx1a]) + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_ACK, req.call_id, + auth_length=0) + self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag) + self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag) + self.assertNotEquals(rep.u.assoc_group_id, req.u.assoc_group_id) + self.assertEquals(rep.u.secondary_address_size, 4) + self.assertEquals(rep.u.secondary_address, "%d" % self.tcp_port) + self.assertEquals(len(rep.u._pad1), 2) + self.assertEquals(rep.u._pad1, '\0' * 2) + self.assertEquals(rep.u.num_results, 1) + self.assertEquals(rep.u.ctx_list[0].result, + dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE) + self.assertEquals(rep.u.ctx_list[0].reason, + dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED) + self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ndr32) + self.assertEquals(rep.u.auth_info, '\0' * 0) + + # With a known but wrong syntax we get a protocol error + # see test_no_auth_presentation_ctx_valid2 + tsf1b_list = [zero_syntax,samba.dcerpc.epmapper.abstract_syntax(),ndr64] + ctx1b = dcerpc.ctx_list() + ctx1b.context_id = 1 + ctx1b.num_transfer_syntaxes = len(tsf1b_list) + ctx1b.abstract_syntax = samba.dcerpc.mgmt.abstract_syntax() + ctx1b.transfer_syntaxes = tsf1b_list + + # Send a alter + req = self.generate_alter(call_id=1, ctx_list=[ctx1b]) + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_FAULT, req.call_id, + pfc_flags=req.pfc_flags | + dcerpc.DCERPC_PFC_FLAG_DID_NOT_EXECUTE, + auth_length=0) + self.assertNotEquals(rep.u.alloc_hint, 0) + self.assertEquals(rep.u.context_id, 0) + self.assertEquals(rep.u.cancel_count, 0) + self.assertEquals(rep.u.flags, 0) + self.assertEquals(rep.u.status, dcerpc.DCERPC_NCA_S_PROTO_ERROR) + self.assertEquals(rep.u.reserved, 0) + self.assertEquals(len(rep.u.error_and_verifier), 0) + + # wait for a disconnect + rep = self.recv_pdu() + self.assertIsNone(rep) + self.assertNotConnected() + + def test_no_auth_presentation_ctx_valid2(self): + ndr32 = base.transfer_syntax_ndr() + + zero_syntax = misc.ndr_syntax_id() + + tsf1a_list = [zero_syntax, ndr32, ndr32, ndr32] + ctx1a = dcerpc.ctx_list() + ctx1a.context_id = 1 + ctx1a.num_transfer_syntaxes = len(tsf1a_list) + ctx1a.abstract_syntax = samba.dcerpc.mgmt.abstract_syntax() + ctx1a.transfer_syntaxes = tsf1a_list + + req = self.generate_bind(call_id=0, ctx_list=[ctx1a]) + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_ACK, req.call_id, + auth_length=0) + self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag) + self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag) + self.assertNotEquals(rep.u.assoc_group_id, req.u.assoc_group_id) + self.assertEquals(rep.u.secondary_address_size, 4) + self.assertEquals(rep.u.secondary_address, "%d" % self.tcp_port) + self.assertEquals(len(rep.u._pad1), 2) + self.assertEquals(rep.u._pad1, '\0' * 2) + self.assertEquals(rep.u.num_results, 1) + self.assertEquals(rep.u.ctx_list[0].result, + dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE) + self.assertEquals(rep.u.ctx_list[0].reason, + dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED) + self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ndr32) + self.assertEquals(rep.u.auth_info, '\0' * 0) + + # With a unknown but wrong syntaxes we get NO protocol error + # see test_no_auth_presentation_ctx_invalid4 + tsf1b_list = [zero_syntax,samba.dcerpc.epmapper.abstract_syntax()] + ctx1b = dcerpc.ctx_list() + ctx1b.context_id = 1 + ctx1b.num_transfer_syntaxes = len(tsf1b_list) + ctx1b.abstract_syntax = samba.dcerpc.mgmt.abstract_syntax() + ctx1b.transfer_syntaxes = tsf1b_list + + # Send a alter + req = self.generate_alter(call_id=1, ctx_list=[ctx1b]) + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_ALTER_RESP, req.call_id, + auth_length=0) + self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag) + self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag) + self.assertNotEquals(rep.u.assoc_group_id, req.u.assoc_group_id) + self.assertEquals(rep.u.secondary_address_size, 0) + self.assertEquals(len(rep.u._pad1), 2) + #self.assertEquals(rep.u._pad1, '\0' * 2) + self.assertEquals(rep.u.num_results, 1) + self.assertEquals(rep.u.ctx_list[0].result, + dcerpc.DCERPC_BIND_ACK_RESULT_PROVIDER_REJECTION) + self.assertEquals(rep.u.ctx_list[0].reason, + dcerpc.DCERPC_BIND_ACK_REASON_TRANSFER_SYNTAXES_NOT_SUPPORTED) + self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, zero_syntax) + self.assertEquals(rep.u.auth_info, '\0' * 0) + + req = self.generate_request(call_id = 2, + context_id=ctx1a.context_id, + opnum=0xffff, + stub="") + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_FAULT, req.call_id, + pfc_flags=req.pfc_flags | + dcerpc.DCERPC_PFC_FLAG_DID_NOT_EXECUTE, + auth_length=0) + self.assertNotEquals(rep.u.alloc_hint, 0) + self.assertEquals(rep.u.context_id, ctx1a.context_id) + self.assertEquals(rep.u.cancel_count, 0) + self.assertEquals(rep.u.flags, 0) + self.assertEquals(rep.u.status, dcerpc.DCERPC_NCA_S_OP_RNG_ERROR) + self.assertEquals(rep.u.reserved, 0) + self.assertEquals(len(rep.u.error_and_verifier), 0) + + def test_no_auth_presentation_ctx_no_ndr64(self): + ndr32 = base.transfer_syntax_ndr() + zero_syntax = misc.ndr_syntax_id() + + tsfZ_list = [zero_syntax] + ctxZ = dcerpc.ctx_list() + ctxZ.context_id = 54321 + ctxZ.num_transfer_syntaxes = len(tsfZ_list) + ctxZ.abstract_syntax = zero_syntax + ctxZ.transfer_syntaxes = tsfZ_list + + req = self.generate_bind(call_id=0, ctx_list=[ctxZ]) + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_ACK, req.call_id, + auth_length=0) + self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag) + self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag) + self.assertNotEquals(rep.u.assoc_group_id, req.u.assoc_group_id) + self.assertEquals(rep.u.secondary_address_size, 4) + self.assertEquals(rep.u.secondary_address, "%d" % self.tcp_port) + self.assertEquals(len(rep.u._pad1), 2) + self.assertEquals(rep.u._pad1, '\0' * 2) + self.assertEquals(rep.u.num_results, 1) + self.assertEquals(rep.u.ctx_list[0].result, + dcerpc.DCERPC_BIND_ACK_RESULT_PROVIDER_REJECTION) + self.assertEquals(rep.u.ctx_list[0].reason, + dcerpc.DCERPC_BIND_ACK_REASON_ABSTRACT_SYNTAX_NOT_SUPPORTED) + self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, zero_syntax) + self.assertEquals(rep.u.auth_info, '\0' * 0) + + tsf0_list = [ndr32] + ctx0 = dcerpc.ctx_list() + ctx0.context_id = 0 + ctx0.num_transfer_syntaxes = len(tsf0_list) + ctx0.abstract_syntax = samba.dcerpc.mgmt.abstract_syntax() + ctx0.transfer_syntaxes = tsf0_list + + req = self.generate_alter(call_id=0, ctx_list=[ctx0]) + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_ALTER_RESP, req.call_id, + auth_length=0) + self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag) + self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag) + self.assertNotEquals(rep.u.assoc_group_id, req.u.assoc_group_id) + self.assertEquals(rep.u.secondary_address_size, 0) + self.assertEquals(len(rep.u._pad1), 2) + #self.assertEquals(rep.u._pad1, '\0' * 2) + self.assertEquals(rep.u.num_results, 1) + self.assertEquals(rep.u.ctx_list[0].result, + dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE) + self.assertEquals(rep.u.ctx_list[0].reason, + dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED) + self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ndr32) + self.assertEquals(rep.u.auth_info, '\0' * 0) + + req = self.generate_request(call_id = 1, + context_id=ctx0.context_id, + opnum=0, + stub="") + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_RESPONSE, req.call_id, + auth_length=0) + self.assertNotEquals(rep.u.alloc_hint, 0) + self.assertEquals(rep.u.context_id, req.u.context_id) + self.assertEquals(rep.u.cancel_count, 0) + self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint) + + tsf1_list = [zero_syntax,ndr32] + ctx1 = dcerpc.ctx_list() + ctx1.context_id = 1 + ctx1.num_transfer_syntaxes = len(tsf1_list) + ctx1.abstract_syntax = samba.dcerpc.mgmt.abstract_syntax() + ctx1.transfer_syntaxes = tsf1_list + + req = self.generate_alter(call_id=1, ctx_list=[ctx1]) + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_ALTER_RESP, req.call_id, + auth_length=0) + self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag) + self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag) + self.assertNotEquals(rep.u.assoc_group_id, req.u.assoc_group_id) + self.assertEquals(rep.u.secondary_address_size, 0) + self.assertEquals(len(rep.u._pad1), 2) + #self.assertEquals(rep.u._pad1, '\0' * 2) + self.assertEquals(rep.u.num_results, 1) + self.assertEquals(rep.u.ctx_list[0].result, + dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE) + self.assertEquals(rep.u.ctx_list[0].reason, + dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED) + self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ndr32) + self.assertEquals(rep.u.auth_info, '\0' * 0) + + req = self.generate_request(call_id = 1, + context_id=ctx1.context_id, + opnum=0, + stub="") + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_RESPONSE, req.call_id, + auth_length=0) + self.assertNotEquals(rep.u.alloc_hint, 0) + self.assertEquals(rep.u.context_id, req.u.context_id) + self.assertEquals(rep.u.cancel_count, 0) + self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint) + + tsf2_list = [ndr32,ndr32] + ctx2 = dcerpc.ctx_list() + ctx2.context_id = 2 + ctx2.num_transfer_syntaxes = len(tsf2_list) + ctx2.abstract_syntax = samba.dcerpc.mgmt.abstract_syntax() + ctx2.transfer_syntaxes = tsf2_list + + req = self.generate_alter(call_id=2, ctx_list=[ctx2]) + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_ALTER_RESP, req.call_id, + auth_length=0) + self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag) + self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag) + self.assertNotEquals(rep.u.assoc_group_id, req.u.assoc_group_id) + self.assertEquals(rep.u.secondary_address_size, 0) + self.assertEquals(len(rep.u._pad1), 2) + #self.assertEquals(rep.u._pad1, '\0' * 2) + self.assertEquals(rep.u.num_results, 1) + self.assertEquals(rep.u.ctx_list[0].result, + dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE) + self.assertEquals(rep.u.ctx_list[0].reason, + dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED) + self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ndr32) + self.assertEquals(rep.u.auth_info, '\0' * 0) + + req = self.generate_request(call_id = 1, + context_id=ctx2.context_id, + opnum=0, + stub="") + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_RESPONSE, req.call_id, + auth_length=0) + self.assertNotEquals(rep.u.alloc_hint, 0) + self.assertEquals(rep.u.context_id, req.u.context_id) + self.assertEquals(rep.u.cancel_count, 0) + self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint) + + tsf3_list = [ndr32] + ctx3 = dcerpc.ctx_list() + ctx3.context_id = 3 + ctx3.num_transfer_syntaxes = len(tsf3_list) + ctx3.abstract_syntax = samba.dcerpc.mgmt.abstract_syntax() + ctx3.transfer_syntaxes = tsf3_list + + tsf4_list = [ndr32] + ctx4 = dcerpc.ctx_list() + ctx4.context_id = 4 + ctx4.num_transfer_syntaxes = len(tsf4_list) + ctx4.abstract_syntax = samba.dcerpc.mgmt.abstract_syntax() + ctx4.transfer_syntaxes = tsf4_list + + req = self.generate_alter(call_id=34, ctx_list=[ctx3,ctx4]) + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_ALTER_RESP, req.call_id, + auth_length=0) + self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag) + self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag) + self.assertNotEquals(rep.u.assoc_group_id, req.u.assoc_group_id) + self.assertEquals(rep.u.secondary_address_size, 0) + self.assertEquals(len(rep.u._pad1), 2) + #self.assertEquals(rep.u._pad1, '\0' * 2) + self.assertEquals(rep.u.num_results, 2) + self.assertEquals(rep.u.ctx_list[0].result, + dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE) + self.assertEquals(rep.u.ctx_list[0].reason, + dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED) + self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ndr32) + self.assertEquals(rep.u.ctx_list[1].result, + dcerpc.DCERPC_BIND_ACK_RESULT_PROVIDER_REJECTION) + self.assertEquals(rep.u.ctx_list[1].reason, + dcerpc.DCERPC_BIND_ACK_REASON_TRANSFER_SYNTAXES_NOT_SUPPORTED) + self.assertNDRSyntaxEquals(rep.u.ctx_list[1].syntax, zero_syntax) + self.assertEquals(rep.u.auth_info, '\0' * 0) + + req = self.generate_request(call_id = 1, + context_id=ctx3.context_id, + opnum=0, + stub="") + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_RESPONSE, req.call_id, + auth_length=0) + self.assertNotEquals(rep.u.alloc_hint, 0) + self.assertEquals(rep.u.context_id, req.u.context_id) + self.assertEquals(rep.u.cancel_count, 0) + self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint) + + req = self.generate_alter(call_id=43, ctx_list=[ctx4,ctx3]) + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_ALTER_RESP, req.call_id, + auth_length=0) + self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag) + self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag) + self.assertNotEquals(rep.u.assoc_group_id, req.u.assoc_group_id) + self.assertEquals(rep.u.secondary_address_size, 0) + self.assertEquals(len(rep.u._pad1), 2) + #self.assertEquals(rep.u._pad1, '\0' * 2) + self.assertEquals(rep.u.num_results, 2) + self.assertEquals(rep.u.ctx_list[0].result, + dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE) + self.assertEquals(rep.u.ctx_list[0].reason, + dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED) + self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ndr32) + self.assertEquals(rep.u.ctx_list[1].result, + dcerpc.DCERPC_BIND_ACK_RESULT_PROVIDER_REJECTION) + self.assertEquals(rep.u.ctx_list[1].reason, + dcerpc.DCERPC_BIND_ACK_REASON_TRANSFER_SYNTAXES_NOT_SUPPORTED) + self.assertNDRSyntaxEquals(rep.u.ctx_list[1].syntax, zero_syntax) + self.assertEquals(rep.u.auth_info, '\0' * 0) + + req = self.generate_request(call_id = 1, + context_id=ctx4.context_id, + opnum=0, + stub="") + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_RESPONSE, req.call_id, + auth_length=0) + self.assertNotEquals(rep.u.alloc_hint, 0) + self.assertEquals(rep.u.context_id, req.u.context_id) + self.assertEquals(rep.u.cancel_count, 0) + self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint) + + req = self.generate_request(call_id = 1, + context_id=ctx3.context_id, + opnum=0, + stub="") + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_RESPONSE, req.call_id, + auth_length=0) + self.assertNotEquals(rep.u.alloc_hint, 0) + self.assertEquals(rep.u.context_id, req.u.context_id) + self.assertEquals(rep.u.cancel_count, 0) + self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint) + + req = self.generate_alter(call_id=44, ctx_list=[ctx4,ctx4]) + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_ALTER_RESP, req.call_id, + auth_length=0) + self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag) + self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag) + self.assertNotEquals(rep.u.assoc_group_id, req.u.assoc_group_id) + self.assertEquals(rep.u.secondary_address_size, 0) + self.assertEquals(len(rep.u._pad1), 2) + #self.assertEquals(rep.u._pad1, '\0' * 2) + self.assertEquals(rep.u.num_results, 2) + self.assertEquals(rep.u.ctx_list[0].result, + dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE) + self.assertEquals(rep.u.ctx_list[0].reason, + dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED) + self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ndr32) + self.assertEquals(rep.u.ctx_list[1].result, + dcerpc.DCERPC_BIND_ACK_RESULT_PROVIDER_REJECTION) + self.assertEquals(rep.u.ctx_list[1].reason, + dcerpc.DCERPC_BIND_ACK_REASON_TRANSFER_SYNTAXES_NOT_SUPPORTED) + self.assertNDRSyntaxEquals(rep.u.ctx_list[1].syntax, zero_syntax) + self.assertEquals(rep.u.auth_info, '\0' * 0) + + req = self.generate_request(call_id = 1, + context_id=ctx4.context_id, + opnum=0, + stub="") + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_RESPONSE, req.call_id, + auth_length=0) + self.assertNotEquals(rep.u.alloc_hint, 0) + self.assertEquals(rep.u.context_id, req.u.context_id) + self.assertEquals(rep.u.cancel_count, 0) + self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint) + + req = self.generate_request(call_id = 1, + context_id=ctx3.context_id, + opnum=0, + stub="") + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_RESPONSE, req.call_id, + auth_length=0) + self.assertNotEquals(rep.u.alloc_hint, 0) + self.assertEquals(rep.u.context_id, req.u.context_id) + self.assertEquals(rep.u.cancel_count, 0) + self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint) + + tsf5mgmt_list = [ndr32] + ctx5mgmt = dcerpc.ctx_list() + ctx5mgmt.context_id = 5 + ctx5mgmt.num_transfer_syntaxes = len(tsf5mgmt_list) + ctx5mgmt.abstract_syntax = samba.dcerpc.mgmt.abstract_syntax() + ctx5mgmt.transfer_syntaxes = tsf5mgmt_list + + tsf5epm_list = [ndr32] + ctx5epm = dcerpc.ctx_list() + ctx5epm.context_id = 5 + ctx5epm.num_transfer_syntaxes = len(tsf5epm_list) + ctx5epm.abstract_syntax = samba.dcerpc.mgmt.abstract_syntax() + ctx5epm.transfer_syntaxes = tsf5epm_list + + req = self.generate_alter(call_id=55, ctx_list=[ctx5mgmt,ctx5epm]) + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_ALTER_RESP, req.call_id, + auth_length=0) + self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag) + self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag) + self.assertNotEquals(rep.u.assoc_group_id, req.u.assoc_group_id) + self.assertEquals(rep.u.secondary_address_size, 0) + self.assertEquals(len(rep.u._pad1), 2) + #self.assertEquals(rep.u._pad1, '\0' * 2) + self.assertEquals(rep.u.num_results, 2) + self.assertEquals(rep.u.ctx_list[0].result, + dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE) + self.assertEquals(rep.u.ctx_list[0].reason, + dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED) + self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ndr32) + self.assertEquals(rep.u.ctx_list[1].result, + dcerpc.DCERPC_BIND_ACK_RESULT_PROVIDER_REJECTION) + self.assertEquals(rep.u.ctx_list[1].reason, + dcerpc.DCERPC_BIND_ACK_REASON_TRANSFER_SYNTAXES_NOT_SUPPORTED) + self.assertNDRSyntaxEquals(rep.u.ctx_list[1].syntax, zero_syntax) + self.assertEquals(rep.u.auth_info, '\0' * 0) + + req = self.generate_request(call_id = 1, + context_id=ctx5mgmt.context_id, + opnum=0, + stub="") + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_RESPONSE, req.call_id, + auth_length=0) + self.assertNotEquals(rep.u.alloc_hint, 0) + self.assertEquals(rep.u.context_id, req.u.context_id) + self.assertEquals(rep.u.cancel_count, 0) + self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint) + + req = self.generate_alter(call_id=55, ctx_list=[ctx5mgmt,ctx5epm]) + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_ALTER_RESP, req.call_id, + auth_length=0) + self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag) + self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag) + self.assertNotEquals(rep.u.assoc_group_id, req.u.assoc_group_id) + self.assertEquals(rep.u.secondary_address_size, 0) + self.assertEquals(len(rep.u._pad1), 2) + #self.assertEquals(rep.u._pad1, '\0' * 2) + self.assertEquals(rep.u.num_results, 2) + self.assertEquals(rep.u.ctx_list[0].result, + dcerpc.DCERPC_BIND_ACK_RESULT_ACCEPTANCE) + self.assertEquals(rep.u.ctx_list[0].reason, + dcerpc.DCERPC_BIND_ACK_REASON_NOT_SPECIFIED) + self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, ndr32) + self.assertEquals(rep.u.ctx_list[1].result, + dcerpc.DCERPC_BIND_ACK_RESULT_PROVIDER_REJECTION) + self.assertEquals(rep.u.ctx_list[1].reason, + dcerpc.DCERPC_BIND_ACK_REASON_TRANSFER_SYNTAXES_NOT_SUPPORTED) + self.assertNDRSyntaxEquals(rep.u.ctx_list[1].syntax, zero_syntax) + self.assertEquals(rep.u.auth_info, '\0' * 0) + + req = self.generate_request(call_id = 1, + context_id=ctx5mgmt.context_id, + opnum=0, + stub="") + self.send_pdu(req) + rep = self.recv_pdu() + self.verify_pdu(rep, dcerpc.DCERPC_PKT_RESPONSE, req.call_id, + auth_length=0) + self.assertNotEquals(rep.u.alloc_hint, 0) + self.assertEquals(rep.u.context_id, req.u.context_id) + self.assertEquals(rep.u.cancel_count, 0) + self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint) + def _test_auth_none_level_bind(self, auth_level, reason=dcerpc.DCERPC_BIND_NAK_REASON_INVALID_AUTH_TYPE): ndr32 = base.transfer_syntax_ndr() diff --git a/selftest/knownfail b/selftest/knownfail index e5abe627c8a..2e8f351edf7 100644 --- a/selftest/knownfail +++ b/selftest/knownfail @@ -295,5 +295,6 @@ ^samba4.smb2.ioctl.copy_chunk_bad_access ^samba4.drs.getnc_exop.python.*getnc_exop.DrsReplicaPrefixMapTestCase.test_regular_prefix_map_ex_attid.* # We don't support NDR64 yet, so we generate the wrong FAULT code +^samba.tests.dcerpc.raw_protocol.*.TestDCERPC_BIND.test_no_auth_presentation_ctx_invalid4 ^samba.tests.dcerpc.raw_protocol.*.TestDCERPC_BIND.test_spnego_change_auth_type2 ^samba.tests.dcerpc.raw_protocol.*.TestDCERPC_BIND.test_spnego_change_transfer