1
0
mirror of https://github.com/samba-team/samba.git synced 2025-01-11 05:18:09 +03:00

RawDCERPCTest: split prepare_pdu() and send_pdu_blob() out of send_pdu()

This will make it possible to alter pdus before sending them to the
server.

BUG: https://bugzilla.samba.org/show_bug.cgi?id=14356

Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Andreas Schneider <asn@samba.org>
(cherry picked from commit 444f9c6624)
This commit is contained in:
Stefan Metzmacher 2020-11-09 14:00:43 +01:00 committed by Jule Anger
parent d921255c84
commit f2705e5b3b

View File

@ -677,34 +677,45 @@ class RawDCERPCTest(TestCase):
self.secondary_address = None
self.connect()
def send_pdu(self, req, ndr_print=None, hexdump=None):
def prepare_pdu(self, req, ndr_print=None, hexdump=None):
if ndr_print is None:
ndr_print = self.do_ndr_print
if hexdump is None:
hexdump = self.do_hexdump
req_pdu = ndr_pack(req)
if ndr_print:
sys.stderr.write("prepare_pdu: %s" % samba.ndr.ndr_print(req))
if hexdump:
sys.stderr.write("prepare_pdu: %d\n%s" % (len(req_pdu), self.hexdump(req_pdu)))
return req_pdu
def send_pdu_blob(self, req_pdu, hexdump=None):
if hexdump is None:
hexdump = self.do_hexdump
try:
req_pdu = ndr_pack(req)
if ndr_print:
sys.stderr.write("send_pdu: %s" % samba.ndr.ndr_print(req))
if hexdump:
sys.stderr.write("send_pdu: %d\n%s" % (len(req_pdu), self.hexdump(req_pdu)))
sys.stderr.write("send_pdu_blob: %d\n%s" % (len(req_pdu), self.hexdump(req_pdu)))
while True:
sent = self.s.send(req_pdu, 0)
if sent == len(req_pdu):
break
req_pdu = req_pdu[sent:]
except socket.error as e:
self._disconnect("send_pdu: %s" % e)
self._disconnect("send_pdu_blob: %s" % e)
raise
except IOError as e:
self._disconnect("send_pdu: %s" % e)
self._disconnect("send_pdu_blob: %s" % e)
raise
except NTSTATUSError as e:
self._disconnect("send_pdu: %s" % e)
self._disconnect("send_pdu_blob: %s" % e)
raise
finally:
pass
def send_pdu(self, req, ndr_print=None, hexdump=None):
req_pdu = self.prepare_pdu(req, ndr_print=ndr_print, hexdump=False)
return self.send_pdu_blob(req_pdu, hexdump=hexdump)
def recv_raw(self, hexdump=None, timeout=None):
rep_pdu = None
if hexdump is None: