1
0
mirror of https://github.com/samba-team/samba.git synced 2025-03-07 00:58:40 +03:00

CVE-2016-0771: tests/dns: RPC => DNS roundtrip test

Make sure that TXT entries stored via RPC come out the same in DNS.

This has one caveat in that adding over RPC in Windows eats slashes,
and so fails there.

BUG: https://bugzilla.samba.org/show_bug.cgi?id=11128
BUG: https://bugzilla.samba.org/show_bug.cgi?id=11686

Signed-off-by: Garming Sam <garming@catalyst.net.nz>
Reviewed-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
This commit is contained in:
Garming Sam 2016-01-28 12:54:58 +13:00 committed by Karolin Seeger
parent 2b4c7dbc15
commit ad5e885c2b
2 changed files with 190 additions and 13 deletions

View File

@ -23,6 +23,7 @@ import samba.ndr as ndr
from samba import credentials, param
from samba.tests import TestCase
from samba.dcerpc import dns, dnsp, dnsserver
from samba.netcmd.dns import TXTRecord, dns_record_match, data_to_dns_record
# This timeout only has relevance when testing against Windows
# Format errors tend to return patchy responses, so a timeout is needed.
@ -879,7 +880,7 @@ class TestZones(DNSTest):
self.creds = self.get_credentials(self.lp)
self.server = os.getenv("SERVER_IP")
self.zone = "test.lan"
self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server),
self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server_ip),
self.lp, self.creds)
def tearDown(self):
@ -899,7 +900,7 @@ class TestZones(DNSTest):
zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0,
self.server,
self.server_ip,
None,
0,
'ZoneCreate',
@ -909,7 +910,7 @@ class TestZones(DNSTest):
def delete_zone(self, zone):
self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0,
self.server,
self.server_ip,
zone,
0,
'DeleteZoneFromDs',
@ -957,12 +958,31 @@ class TestRPCRoundtrip(DNSTest):
self.lp = self.get_loadparm()
self.creds = self.get_credentials(self.lp)
self.server = os.getenv("SERVER_IP")
self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server),
self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server_ip),
self.lp, self.creds)
def tearDown(self):
super(TestRPCRoundtrip, self).tearDown()
def test_update_add_txt_rpc_to_dns(self):
prefix, txt = 'rpctextrec', ['"This is a test"']
name = "%s.%s" % (prefix, self.get_dns_domain())
rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""')
add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
add_rec_buf.rec = rec
try:
self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0, self.server_ip, self.get_dns_domain(),
name, add_rec_buf, None)
self.check_query_txt(prefix, txt)
finally:
self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0, self.server_ip, self.get_dns_domain(),
name, None, add_rec_buf)
def test_update_add_null_padded_txt_record(self):
"test adding records works"
prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
@ -970,7 +990,7 @@ class TestRPCRoundtrip(DNSTest):
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.check_query_txt(prefix, txt)
self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
self.get_dns_domain(),
"%s.%s" % (prefix, self.get_dns_domain()),
dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" ""'))
@ -980,7 +1000,7 @@ class TestRPCRoundtrip(DNSTest):
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.check_query_txt(prefix, txt)
self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
self.get_dns_domain(),
"%s.%s" % (prefix, self.get_dns_domain()),
dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" "" "more text"'))
@ -990,11 +1010,66 @@ class TestRPCRoundtrip(DNSTest):
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.check_query_txt(prefix, txt)
self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
self.get_dns_domain(),
"%s.%s" % (prefix, self.get_dns_domain()),
dnsp.DNS_TYPE_TXT, '"" "" "\\"This is a test\\""'))
def test_update_add_padding_rpc_to_dns(self):
prefix, txt = 'pad1textrec', ['"This is a test"', '', '']
prefix = 'rpc' + prefix
name = "%s.%s" % (prefix, self.get_dns_domain())
rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" ""')
add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
add_rec_buf.rec = rec
try:
self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0, self.server_ip, self.get_dns_domain(),
name, add_rec_buf, None)
self.check_query_txt(prefix, txt)
finally:
self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0, self.server_ip, self.get_dns_domain(),
name, None, add_rec_buf)
prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text']
prefix = 'rpc' + prefix
name = "%s.%s" % (prefix, self.get_dns_domain())
rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" "" "more text"')
add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
add_rec_buf.rec = rec
try:
self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0, self.server_ip, self.get_dns_domain(),
name, add_rec_buf, None)
self.check_query_txt(prefix, txt)
finally:
self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0, self.server_ip, self.get_dns_domain(),
name, None, add_rec_buf)
prefix, txt = 'pad3textrec', ['', '', '"This is a test"']
prefix = 'rpc' + prefix
name = "%s.%s" % (prefix, self.get_dns_domain())
rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"" "" "\\"This is a test\\""')
add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
add_rec_buf.rec = rec
try:
self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0, self.server_ip, self.get_dns_domain(),
name, add_rec_buf, None)
self.check_query_txt(prefix, txt)
finally:
self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0, self.server_ip, self.get_dns_domain(),
name, None, add_rec_buf)
# Test is incomplete due to strlen against txt records
def test_update_add_null_char_txt_record(self):
"test adding records works"
@ -1003,7 +1078,7 @@ class TestRPCRoundtrip(DNSTest):
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.check_query_txt(prefix, ['NULL'])
self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
self.get_dns_domain(),
"%s.%s" % (prefix, self.get_dns_domain()),
dnsp.DNS_TYPE_TXT, '"NULL"'))
@ -1013,11 +1088,30 @@ class TestRPCRoundtrip(DNSTest):
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.check_query_txt(prefix, ['NULL', 'NULL'])
self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
self.get_dns_domain(),
"%s.%s" % (prefix, self.get_dns_domain()),
dnsp.DNS_TYPE_TXT, '"NULL" "NULL"'))
def test_update_add_null_char_rpc_to_dns(self):
prefix, txt = 'nulltextrec', ['NULL\x00BYTE']
prefix = 'rpc' + prefix
name = "%s.%s" % (prefix, self.get_dns_domain())
rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"NULL"')
add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
add_rec_buf.rec = rec
try:
self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0, self.server_ip, self.get_dns_domain(),
name, add_rec_buf, None)
self.check_query_txt(prefix, ['NULL'])
finally:
self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0, self.server_ip, self.get_dns_domain(),
name, None, add_rec_buf)
def test_update_add_hex_char_txt_record(self):
"test adding records works"
prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
@ -1025,11 +1119,30 @@ class TestRPCRoundtrip(DNSTest):
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.check_query_txt(prefix, txt)
self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
self.get_dns_domain(),
"%s.%s" % (prefix, self.get_dns_domain()),
dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"'))
def test_update_add_hex_rpc_to_dns(self):
prefix, txt = 'hextextrec', ['HIGH\xFFBYTE']
prefix = 'rpc' + prefix
name = "%s.%s" % (prefix, self.get_dns_domain())
rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"')
add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
add_rec_buf.rec = rec
try:
self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0, self.server_ip, self.get_dns_domain(),
name, add_rec_buf, None)
self.check_query_txt(prefix, txt)
finally:
self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0, self.server_ip, self.get_dns_domain(),
name, None, add_rec_buf)
def test_update_add_slash_txt_record(self):
"test adding records works"
prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
@ -1037,11 +1150,33 @@ class TestRPCRoundtrip(DNSTest):
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.check_query_txt(prefix, txt)
self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
self.get_dns_domain(),
"%s.%s" % (prefix, self.get_dns_domain()),
dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"'))
# This test fails against Windows as it eliminates slashes in RPC
# One typical use for a slash is in records like 'var=value' to
# escape '=' characters.
def test_update_add_slash_rpc_to_dns(self):
prefix, txt = 'slashtextrec', ['Th\\=is=is a test']
prefix = 'rpc' + prefix
name = "%s.%s" % (prefix, self.get_dns_domain())
rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"')
add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
add_rec_buf.rec = rec
try:
self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0, self.server_ip, self.get_dns_domain(),
name, add_rec_buf, None)
self.check_query_txt(prefix, txt)
finally:
self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0, self.server_ip, self.get_dns_domain(),
name, None, add_rec_buf)
def test_update_add_two_txt_records(self):
"test adding two txt records works"
prefix, txt = 'textrec2', ['"This is a test"',
@ -1050,12 +1185,34 @@ class TestRPCRoundtrip(DNSTest):
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.check_query_txt(prefix, txt)
self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
self.get_dns_domain(),
"%s.%s" % (prefix, self.get_dns_domain()),
dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""' +
' "\\"and this is a test, too\\""'))
def test_update_add_two_rpc_to_dns(self):
prefix, txt = 'textrec2', ['"This is a test"',
'"and this is a test, too"']
prefix = 'rpc' + prefix
name = "%s.%s" % (prefix, self.get_dns_domain())
rec = data_to_dns_record(dnsp.DNS_TYPE_TXT,
'"\\"This is a test\\""' +
' "\\"and this is a test, too\\""')
add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
add_rec_buf.rec = rec
try:
self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0, self.server_ip, self.get_dns_domain(),
name, add_rec_buf, None)
self.check_query_txt(prefix, txt)
finally:
self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0, self.server_ip, self.get_dns_domain(),
name, None, add_rec_buf)
def test_update_add_empty_txt_records(self):
"test adding two txt records works"
prefix, txt = 'emptytextrec', []
@ -1063,11 +1220,30 @@ class TestRPCRoundtrip(DNSTest):
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.check_query_txt(prefix, txt)
self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server,
self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip,
self.get_dns_domain(),
"%s.%s" % (prefix, self.get_dns_domain()),
dnsp.DNS_TYPE_TXT, ''))
def test_update_add_empty_rpc_to_dns(self):
prefix, txt = 'rpcemptytextrec', []
name = "%s.%s" % (prefix, self.get_dns_domain())
rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '')
add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
add_rec_buf.rec = rec
try:
self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0, self.server_ip, self.get_dns_domain(),
name, add_rec_buf, None)
self.check_query_txt(prefix, txt)
finally:
self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0, self.server_ip, self.get_dns_domain(),
name, None, add_rec_buf)
if __name__ == "__main__":
import unittest
unittest.main()

View File

@ -291,6 +291,7 @@ for f in sorted(os.listdir(os.path.join(samba4srcdir, "../pidl/tests"))):
# DNS tests
planpythontestsuite("fl2003dc:local", "samba.tests.dns")
for t in smbtorture4_testsuites("dns_internal."):
plansmbtorture4testsuite(t, "ad_dc_ntvfs:local", '//$SERVER/whavever')