1
0
mirror of https://github.com/samba-team/samba.git synced 2025-08-21 09:49:28 +03:00

CVE-2022-2031 tests/krb5: Add methods to send and receive generic messages

This allows us to send and receive kpasswd messages, while avoiding the
existing logic for encoding and decoding other Kerberos message types.

BUG: https://bugzilla.samba.org/show_bug.cgi?id=15047
BUG: https://bugzilla.samba.org/show_bug.cgi?id=15049
BUG: https://bugzilla.samba.org/show_bug.cgi?id=15074

Signed-off-by: Joseph Sutton <josephsutton@catalyst.net.nz>
Reviewed-by: Andreas Schneider <asn@samba.org>
This commit is contained in:
Joseph Sutton
2022-05-24 19:20:28 +12:00
committed by Jule Anger
parent ae7dd875cd
commit 13fe7e013e

View File

@ -920,24 +920,28 @@ class RawKerberosTest(TestCaseInTempDir):
return blob
def send_pdu(self, req, asn1_print=None, hexdump=None):
k5_pdu = self.der_encode(
req, native_decode=False, asn1_print=asn1_print, hexdump=False)
self.send_msg(k5_pdu, hexdump=hexdump)
def send_msg(self, msg, hexdump=None):
header = struct.pack('>I', len(msg))
req_pdu = header
req_pdu += msg
self.hex_dump("send_msg", header, hexdump=hexdump)
self.hex_dump("send_msg", msg, hexdump=hexdump)
try:
k5_pdu = self.der_encode(
req, native_decode=False, asn1_print=asn1_print, hexdump=False)
header = struct.pack('>I', len(k5_pdu))
req_pdu = header
req_pdu += k5_pdu
self.hex_dump("send_pdu", header, hexdump=hexdump)
self.hex_dump("send_pdu", k5_pdu, hexdump=hexdump)
while True:
sent = self.s.send(req_pdu, 0)
if sent == len(req_pdu):
break
return
req_pdu = req_pdu[sent:]
except socket.error as e:
self._disconnect("send_pdu: %s" % e)
self._disconnect("send_msg: %s" % e)
raise
except IOError as e:
self._disconnect("send_pdu: %s" % e)
self._disconnect("send_msg: %s" % e)
raise
def recv_raw(self, num_recv=0xffff, hexdump=None, timeout=None):
@ -963,16 +967,14 @@ class RawKerberosTest(TestCaseInTempDir):
return rep_pdu
def recv_pdu_raw(self, asn1_print=None, hexdump=None, timeout=None):
rep_pdu = None
rep = None
raw_pdu = self.recv_raw(
num_recv=4, hexdump=hexdump, timeout=timeout)
if raw_pdu is None:
return (None, None)
return None
header = struct.unpack(">I", raw_pdu[0:4])
k5_len = header[0]
if k5_len == 0:
return (None, "")
return ""
missing = k5_len
rep_pdu = b''
while missing > 0:
@ -981,6 +983,14 @@ class RawKerberosTest(TestCaseInTempDir):
self.assertGreaterEqual(len(raw_pdu), 1)
rep_pdu += raw_pdu
missing = k5_len - len(rep_pdu)
return rep_pdu
def recv_reply(self, asn1_print=None, hexdump=None, timeout=None):
rep_pdu = self.recv_pdu_raw(asn1_print=asn1_print,
hexdump=hexdump,
timeout=timeout)
if not rep_pdu:
return None, rep_pdu
k5_raw = self.der_decode(
rep_pdu,
asn1Spec=None,
@ -1002,9 +1012,9 @@ class RawKerberosTest(TestCaseInTempDir):
return (rep, rep_pdu)
def recv_pdu(self, asn1_print=None, hexdump=None, timeout=None):
(rep, rep_pdu) = self.recv_pdu_raw(asn1_print=asn1_print,
hexdump=hexdump,
timeout=timeout)
(rep, rep_pdu) = self.recv_reply(asn1_print=asn1_print,
hexdump=hexdump,
timeout=timeout)
return rep
def assertIsConnected(self):