1
0
mirror of https://github.com/samba-team/samba.git synced 2025-02-04 17:47:26 +03:00

python/tests: add bind time feature related tests to dcerpc raw protocol tests

Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Andreas Schneider <asn@samba.org>
This commit is contained in:
Stefan Metzmacher 2015-10-23 15:39:34 +02:00 committed by Andreas Schneider
parent fe5b462a76
commit 60099d491b

View File

@ -1296,6 +1296,152 @@ class TestDCERPC_BIND(RawDCERPCTest):
self.assertEquals(rep.u.cancel_count, 0)
self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint)
def test_no_auth_bind_time_none_simple(self):
features = 0
btf = base.bind_time_features_syntax(features)
zero_syntax = misc.ndr_syntax_id()
tsf1_list = [btf]
ctx1 = dcerpc.ctx_list()
ctx1.context_id = 1
ctx1.num_transfer_syntaxes = len(tsf1_list)
ctx1.abstract_syntax = zero_syntax
ctx1.transfer_syntaxes = tsf1_list
req = self.generate_bind(call_id=0, ctx_list=[ctx1])
self.send_pdu(req)
rep = self.recv_pdu()
self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_ACK, req.call_id,
auth_length=0)
self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag)
self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag)
self.assertNotEquals(rep.u.assoc_group_id, req.u.assoc_group_id)
self.assertEquals(rep.u.secondary_address_size, 4)
self.assertEquals(rep.u.secondary_address, "%d" % self.tcp_port)
self.assertEquals(len(rep.u._pad1), 2)
self.assertEquals(rep.u._pad1, '\0' * 2)
self.assertEquals(rep.u.num_results, 1)
self.assertEquals(rep.u.ctx_list[0].result,
dcerpc.DCERPC_BIND_ACK_RESULT_NEGOTIATE_ACK)
self.assertEquals(rep.u.ctx_list[0].reason, features)
self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, zero_syntax)
self.assertEquals(rep.u.auth_info, '\0' * 0)
def test_no_auth_bind_time_none_ignore_additional(self):
features1 = 0
btf1 = base.bind_time_features_syntax(features1)
features2 = dcerpc.DCERPC_BIND_TIME_KEEP_CONNECTION_ON_ORPHAN
features2 |= dcerpc.DCERPC_BIND_TIME_SECURITY_CONTEXT_MULTIPLEXING
btf2 = base.bind_time_features_syntax(features2)
zero_syntax = misc.ndr_syntax_id()
ndr64 = base.transfer_syntax_ndr64()
tsf1_list = [btf1,btf2,zero_syntax]
ctx1 = dcerpc.ctx_list()
ctx1.context_id = 1
ctx1.num_transfer_syntaxes = len(tsf1_list)
ctx1.abstract_syntax = ndr64
ctx1.transfer_syntaxes = tsf1_list
req = self.generate_bind(call_id=0, ctx_list=[ctx1])
self.send_pdu(req)
rep = self.recv_pdu()
self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_ACK, req.call_id,
auth_length=0)
self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag)
self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag)
self.assertNotEquals(rep.u.assoc_group_id, req.u.assoc_group_id)
self.assertEquals(rep.u.secondary_address_size, 4)
self.assertEquals(rep.u.secondary_address, "%d" % self.tcp_port)
self.assertEquals(len(rep.u._pad1), 2)
self.assertEquals(rep.u._pad1, '\0' * 2)
self.assertEquals(rep.u.num_results, 1)
self.assertEquals(rep.u.ctx_list[0].result,
dcerpc.DCERPC_BIND_ACK_RESULT_NEGOTIATE_ACK)
self.assertEquals(rep.u.ctx_list[0].reason, features1)
self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, zero_syntax)
self.assertEquals(rep.u.auth_info, '\0' * 0)
def test_no_auth_bind_time_only_first(self):
features1 = dcerpc.DCERPC_BIND_TIME_KEEP_CONNECTION_ON_ORPHAN
btf1 = base.bind_time_features_syntax(features1)
features2 = dcerpc.DCERPC_BIND_TIME_SECURITY_CONTEXT_MULTIPLEXING
btf2 = base.bind_time_features_syntax(features2)
zero_syntax = misc.ndr_syntax_id()
tsf1_list = [zero_syntax,btf1,btf2,zero_syntax]
ctx1 = dcerpc.ctx_list()
ctx1.context_id = 1
ctx1.num_transfer_syntaxes = len(tsf1_list)
ctx1.abstract_syntax = zero_syntax
ctx1.transfer_syntaxes = tsf1_list
req = self.generate_bind(call_id=0, ctx_list=[ctx1])
self.send_pdu(req)
rep = self.recv_pdu()
self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_ACK, req.call_id,
auth_length=0)
self.assertEquals(rep.u.max_xmit_frag, req.u.max_xmit_frag)
self.assertEquals(rep.u.max_recv_frag, req.u.max_recv_frag)
self.assertNotEquals(rep.u.assoc_group_id, req.u.assoc_group_id)
self.assertEquals(rep.u.secondary_address_size, 4)
self.assertEquals(rep.u.secondary_address, "%d" % self.tcp_port)
self.assertEquals(len(rep.u._pad1), 2)
self.assertEquals(rep.u._pad1, '\0' * 2)
self.assertEquals(rep.u.num_results, 1)
self.assertEquals(rep.u.ctx_list[0].result,
dcerpc.DCERPC_BIND_ACK_RESULT_PROVIDER_REJECTION)
self.assertEquals(rep.u.ctx_list[0].reason,
dcerpc.DCERPC_BIND_ACK_REASON_ABSTRACT_SYNTAX_NOT_SUPPORTED)
self.assertNDRSyntaxEquals(rep.u.ctx_list[0].syntax, zero_syntax)
self.assertEquals(rep.u.auth_info, '\0' * 0)
def test_no_auth_bind_time_twice(self):
features1 = dcerpc.DCERPC_BIND_TIME_KEEP_CONNECTION_ON_ORPHAN
btf1 = base.bind_time_features_syntax(features1)
features2 = dcerpc.DCERPC_BIND_TIME_SECURITY_CONTEXT_MULTIPLEXING
btf2 = base.bind_time_features_syntax(features2)
zero_syntax = misc.ndr_syntax_id()
tsf1_list = [btf1]
ctx1 = dcerpc.ctx_list()
ctx1.context_id = 1
ctx1.num_transfer_syntaxes = len(tsf1_list)
ctx1.abstract_syntax = zero_syntax
ctx1.transfer_syntaxes = tsf1_list
tsf2_list = [btf2]
ctx2 = dcerpc.ctx_list()
ctx2.context_id = 2
ctx2.num_transfer_syntaxes = len(tsf2_list)
ctx2.abstract_syntax = zero_syntax
ctx2.transfer_syntaxes = tsf2_list
req = self.generate_bind(call_id=0, ctx_list=[ctx1,ctx2])
self.send_pdu(req)
rep = self.recv_pdu()
self.verify_pdu(rep, dcerpc.DCERPC_PKT_BIND_NAK, req.call_id,
auth_length=0)
self.assertEquals(rep.u.reject_reason,
dcerpc.DCERPC_BIND_NAK_REASON_NOT_SPECIFIED)
self.assertEquals(rep.u.num_versions, 1)
self.assertEquals(rep.u.versions[0].rpc_vers, req.rpc_vers)
self.assertEquals(rep.u.versions[0].rpc_vers_minor, req.rpc_vers_minor)
self.assertEquals(len(rep.u._pad), 3)
self.assertEquals(rep.u._pad, '\0' * 3)
# wait for a disconnect
rep = self.recv_pdu()
self.assertIsNone(rep)
self.assertNotConnected()
def _test_auth_none_level_bind(self, auth_level,
reason=dcerpc.DCERPC_BIND_NAK_REASON_INVALID_AUTH_TYPE):
ndr32 = base.transfer_syntax_ndr()