mirror of
https://github.com/samba-team/samba.git
synced 2025-08-24 21:49:29 +03:00
python/tests: add simple dcerpc orphaned tests
ORPHANED is mostly ignored. It's up to the application server implementation to install a orphaned handler. Signed-off-by: Stefan Metzmacher <metze@samba.org> Reviewed-by: Andreas Schneider <asn@samba.org>
This commit is contained in:
committed by
Andreas Schneider
parent
3c474cd489
commit
9ef8bfabc6
@ -2391,6 +2391,224 @@ class TestDCERPC_BIND(RawDCERPCTest):
|
||||
self.assertEquals(rep.u.cancel_count, 0)
|
||||
self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint)
|
||||
|
||||
def test_orphaned_no_request(self):
|
||||
ndr32 = base.transfer_syntax_ndr()
|
||||
abstract = samba.dcerpc.mgmt.abstract_syntax()
|
||||
ctx = self.prepare_presentation(abstract, ndr32)
|
||||
|
||||
req = self.generate_orphaned(call_id = 3)
|
||||
self.send_pdu(req)
|
||||
rep = self.recv_pdu(timeout=0.01)
|
||||
self.assertIsNone(rep)
|
||||
self.assertIsConnected()
|
||||
|
||||
# And now try a request
|
||||
req = self.generate_request(call_id = 1,
|
||||
context_id=ctx.context_id,
|
||||
opnum=0,
|
||||
stub="")
|
||||
self.send_pdu(req)
|
||||
rep = self.recv_pdu()
|
||||
self.verify_pdu(rep, dcerpc.DCERPC_PKT_RESPONSE, req.call_id,
|
||||
auth_length=0)
|
||||
self.assertNotEquals(rep.u.alloc_hint, 0)
|
||||
self.assertEquals(rep.u.context_id, req.u.context_id & 0xff)
|
||||
self.assertEquals(rep.u.cancel_count, 0)
|
||||
self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint)
|
||||
|
||||
def test_orphaned_request_after_first_last(self):
|
||||
ndr32 = base.transfer_syntax_ndr()
|
||||
abstract = samba.dcerpc.mgmt.abstract_syntax()
|
||||
ctx = self.prepare_presentation(abstract, ndr32)
|
||||
|
||||
req = self.generate_request(call_id = 1,
|
||||
pfc_flags=dcerpc.DCERPC_PFC_FLAG_FIRST,
|
||||
context_id=ctx.context_id,
|
||||
opnum=0,
|
||||
stub="")
|
||||
self.send_pdu(req)
|
||||
rep = self.recv_pdu(timeout=0.1)
|
||||
self.assertIsNone(rep)
|
||||
self.assertIsConnected()
|
||||
|
||||
req = self.generate_orphaned(call_id = 1)
|
||||
self.send_pdu(req)
|
||||
rep = self.recv_pdu(timeout=0.1)
|
||||
self.assertIsNone(rep)
|
||||
self.assertIsConnected()
|
||||
|
||||
req = self.generate_request(call_id = 1,
|
||||
pfc_flags=dcerpc.DCERPC_PFC_FLAG_LAST,
|
||||
context_id=ctx.context_id,
|
||||
opnum=0,
|
||||
stub="")
|
||||
self.send_pdu(req)
|
||||
rep = self.recv_pdu()
|
||||
self.verify_pdu(rep, dcerpc.DCERPC_PKT_RESPONSE, req.call_id,
|
||||
auth_length=0)
|
||||
self.assertNotEquals(rep.u.alloc_hint, 0)
|
||||
self.assertEquals(rep.u.context_id, req.u.context_id & 0xff)
|
||||
self.assertEquals(rep.u.cancel_count, 0)
|
||||
self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint)
|
||||
|
||||
# And now try a request
|
||||
req = self.generate_request(call_id = 2,
|
||||
context_id=ctx.context_id,
|
||||
opnum=0,
|
||||
stub="")
|
||||
self.send_pdu(req)
|
||||
rep = self.recv_pdu()
|
||||
self.verify_pdu(rep, dcerpc.DCERPC_PKT_RESPONSE, req.call_id,
|
||||
auth_length=0)
|
||||
self.assertNotEquals(rep.u.alloc_hint, 0)
|
||||
self.assertEquals(rep.u.context_id, req.u.context_id & 0xff)
|
||||
self.assertEquals(rep.u.cancel_count, 0)
|
||||
self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint)
|
||||
|
||||
def test_orphaned_request_after_first_mpx_last(self):
|
||||
ndr32 = base.transfer_syntax_ndr()
|
||||
abstract = samba.dcerpc.mgmt.abstract_syntax()
|
||||
|
||||
pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST
|
||||
pfc_flags |= samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST
|
||||
pfc_flags |= samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_CONC_MPX
|
||||
ctx = self.prepare_presentation(abstract, ndr32, pfc_flags=pfc_flags)
|
||||
|
||||
req = self.generate_request(call_id = 1,
|
||||
pfc_flags=dcerpc.DCERPC_PFC_FLAG_FIRST,
|
||||
context_id=ctx.context_id,
|
||||
opnum=0,
|
||||
stub="")
|
||||
self.send_pdu(req)
|
||||
rep = self.recv_pdu(timeout=0.1)
|
||||
self.assertIsNone(rep)
|
||||
self.assertIsConnected()
|
||||
|
||||
req = self.generate_orphaned(call_id = 1)
|
||||
self.send_pdu(req)
|
||||
rep = self.recv_pdu(timeout=0.1)
|
||||
self.assertIsNone(rep)
|
||||
self.assertIsConnected()
|
||||
|
||||
req = self.generate_request(call_id = 1,
|
||||
pfc_flags=dcerpc.DCERPC_PFC_FLAG_LAST,
|
||||
context_id=ctx.context_id,
|
||||
opnum=0,
|
||||
stub="")
|
||||
self.send_pdu(req)
|
||||
rep = self.recv_pdu()
|
||||
self.verify_pdu(rep, dcerpc.DCERPC_PKT_RESPONSE, req.call_id,
|
||||
auth_length=0)
|
||||
self.assertNotEquals(rep.u.alloc_hint, 0)
|
||||
self.assertEquals(rep.u.context_id, req.u.context_id & 0xff)
|
||||
self.assertEquals(rep.u.cancel_count, 0)
|
||||
self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint)
|
||||
|
||||
# And now try a request
|
||||
req = self.generate_request(call_id = 2,
|
||||
context_id=ctx.context_id,
|
||||
opnum=0,
|
||||
stub="")
|
||||
self.send_pdu(req)
|
||||
rep = self.recv_pdu()
|
||||
self.verify_pdu(rep, dcerpc.DCERPC_PKT_RESPONSE, req.call_id,
|
||||
auth_length=0)
|
||||
self.assertNotEquals(rep.u.alloc_hint, 0)
|
||||
self.assertEquals(rep.u.context_id, req.u.context_id & 0xff)
|
||||
self.assertEquals(rep.u.cancel_count, 0)
|
||||
self.assertGreaterEqual(len(rep.u.stub_and_verifier), rep.u.alloc_hint)
|
||||
|
||||
def test_orphaned_request_after_first_no_last(self):
|
||||
ndr32 = base.transfer_syntax_ndr()
|
||||
abstract = samba.dcerpc.mgmt.abstract_syntax()
|
||||
ctx = self.prepare_presentation(abstract, ndr32)
|
||||
|
||||
req1 = self.generate_request(call_id = 1,
|
||||
pfc_flags=dcerpc.DCERPC_PFC_FLAG_FIRST,
|
||||
context_id=ctx.context_id,
|
||||
opnum=0,
|
||||
stub="")
|
||||
self.send_pdu(req1)
|
||||
rep = self.recv_pdu(timeout=0.1)
|
||||
self.assertIsNone(rep)
|
||||
self.assertIsConnected()
|
||||
|
||||
req = self.generate_orphaned(call_id = 1)
|
||||
self.send_pdu(req)
|
||||
rep = self.recv_pdu(timeout=0.1)
|
||||
self.assertIsNone(rep)
|
||||
self.assertIsConnected()
|
||||
|
||||
# And now try a new request
|
||||
req2 = self.generate_request(call_id = 2,
|
||||
context_id=ctx.context_id,
|
||||
opnum=0,
|
||||
stub="")
|
||||
self.send_pdu(req2)
|
||||
rep = self.recv_pdu()
|
||||
self.verify_pdu(rep, dcerpc.DCERPC_PKT_FAULT, req1.call_id,
|
||||
auth_length=0)
|
||||
self.assertNotEquals(rep.u.alloc_hint, 0)
|
||||
self.assertEquals(rep.u.context_id, req1.u.context_id)
|
||||
self.assertEquals(rep.u.cancel_count, 0)
|
||||
self.assertEquals(rep.u.flags, 0)
|
||||
self.assertEquals(rep.u.status, dcerpc.DCERPC_NCA_S_PROTO_ERROR)
|
||||
self.assertEquals(rep.u.reserved, 0)
|
||||
self.assertEquals(len(rep.u.error_and_verifier), 0)
|
||||
|
||||
# wait for a disconnect
|
||||
rep = self.recv_pdu()
|
||||
self.assertIsNone(rep)
|
||||
self.assertNotConnected()
|
||||
|
||||
def test_orphaned_request_after_first_mpx_no_last(self):
|
||||
ndr32 = base.transfer_syntax_ndr()
|
||||
abstract = samba.dcerpc.mgmt.abstract_syntax()
|
||||
|
||||
pfc_flags = samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_FIRST
|
||||
pfc_flags |= samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_LAST
|
||||
pfc_flags |= samba.dcerpc.dcerpc.DCERPC_PFC_FLAG_CONC_MPX
|
||||
ctx = self.prepare_presentation(abstract, ndr32,
|
||||
pfc_flags=pfc_flags)
|
||||
|
||||
req1 = self.generate_request(call_id = 1,
|
||||
pfc_flags=dcerpc.DCERPC_PFC_FLAG_FIRST,
|
||||
context_id=ctx.context_id,
|
||||
opnum=0,
|
||||
stub="")
|
||||
self.send_pdu(req1)
|
||||
rep = self.recv_pdu(timeout=0.1)
|
||||
self.assertIsNone(rep)
|
||||
self.assertIsConnected()
|
||||
|
||||
req = self.generate_orphaned(call_id = 1)
|
||||
self.send_pdu(req)
|
||||
rep = self.recv_pdu(timeout=0.1)
|
||||
self.assertIsNone(rep)
|
||||
self.assertIsConnected()
|
||||
|
||||
# And now try a new request
|
||||
req2 = self.generate_request(call_id = 2,
|
||||
context_id=ctx.context_id-1,
|
||||
opnum=0,
|
||||
stub="")
|
||||
self.send_pdu(req2)
|
||||
rep = self.recv_pdu()
|
||||
self.verify_pdu(rep, dcerpc.DCERPC_PKT_FAULT, req2.call_id,
|
||||
auth_length=0)
|
||||
self.assertNotEquals(rep.u.alloc_hint, 0)
|
||||
self.assertEquals(rep.u.context_id, 0)
|
||||
self.assertEquals(rep.u.cancel_count, 0)
|
||||
self.assertEquals(rep.u.flags, 0)
|
||||
self.assertEquals(rep.u.status, dcerpc.DCERPC_NCA_S_PROTO_ERROR)
|
||||
self.assertEquals(rep.u.reserved, 0)
|
||||
self.assertEquals(len(rep.u.error_and_verifier), 0)
|
||||
|
||||
# wait for a disconnect
|
||||
rep = self.recv_pdu()
|
||||
self.assertIsNone(rep)
|
||||
self.assertNotConnected()
|
||||
|
||||
def test_spnego_connect_request(self):
|
||||
ndr32 = base.transfer_syntax_ndr()
|
||||
|
||||
|
Reference in New Issue
Block a user