diff --git a/python/samba/tests/dns.py b/python/samba/tests/dns.py index 25897a77b1d..3e655c37bc3 100644 --- a/python/samba/tests/dns.py +++ b/python/samba/tests/dns.py @@ -23,6 +23,7 @@ import samba.ndr as ndr from samba import credentials, param from samba.tests import TestCase from samba.dcerpc import dns, dnsp, dnsserver +from samba.netcmd.dns import TXTRecord, dns_record_match, data_to_dns_record # This timeout only has relevance when testing against Windows # Format errors tend to return patchy responses, so a timeout is needed. @@ -879,7 +880,7 @@ class TestZones(DNSTest): self.creds = self.get_credentials(self.lp) self.server = os.getenv("SERVER_IP") self.zone = "test.lan" - self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server), + self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server_ip), self.lp, self.creds) def tearDown(self): @@ -899,7 +900,7 @@ class TestZones(DNSTest): zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, 0, - self.server, + self.server_ip, None, 0, 'ZoneCreate', @@ -909,7 +910,7 @@ class TestZones(DNSTest): def delete_zone(self, zone): self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, 0, - self.server, + self.server_ip, zone, 0, 'DeleteZoneFromDs', @@ -957,12 +958,31 @@ class TestRPCRoundtrip(DNSTest): self.lp = self.get_loadparm() self.creds = self.get_credentials(self.lp) self.server = os.getenv("SERVER_IP") - self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server), + self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server_ip), self.lp, self.creds) def tearDown(self): super(TestRPCRoundtrip, self).tearDown() + def test_update_add_txt_rpc_to_dns(self): + prefix, txt = 'rpctextrec', ['"This is a test"'] + + name = "%s.%s" % (prefix, self.get_dns_domain()) + + rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""') + add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() + add_rec_buf.rec = rec + try: + self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, self.server_ip, self.get_dns_domain(), + name, add_rec_buf, None) + + self.check_query_txt(prefix, txt) + finally: + self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, self.server_ip, self.get_dns_domain(), + name, None, add_rec_buf) + def test_update_add_null_padded_txt_record(self): "test adding records works" prefix, txt = 'pad1textrec', ['"This is a test"', '', ''] @@ -970,7 +990,7 @@ class TestRPCRoundtrip(DNSTest): response = self.dns_transaction_udp(p) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) self.check_query_txt(prefix, txt) - self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server, + self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip, self.get_dns_domain(), "%s.%s" % (prefix, self.get_dns_domain()), dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" ""')) @@ -980,7 +1000,7 @@ class TestRPCRoundtrip(DNSTest): response = self.dns_transaction_udp(p) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) self.check_query_txt(prefix, txt) - self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server, + self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip, self.get_dns_domain(), "%s.%s" % (prefix, self.get_dns_domain()), dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" "" "more text"')) @@ -990,11 +1010,66 @@ class TestRPCRoundtrip(DNSTest): response = self.dns_transaction_udp(p) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) self.check_query_txt(prefix, txt) - self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server, + self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip, self.get_dns_domain(), "%s.%s" % (prefix, self.get_dns_domain()), dnsp.DNS_TYPE_TXT, '"" "" "\\"This is a test\\""')) + def test_update_add_padding_rpc_to_dns(self): + prefix, txt = 'pad1textrec', ['"This is a test"', '', ''] + prefix = 'rpc' + prefix + name = "%s.%s" % (prefix, self.get_dns_domain()) + + rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" ""') + add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() + add_rec_buf.rec = rec + try: + self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, self.server_ip, self.get_dns_domain(), + name, add_rec_buf, None) + + self.check_query_txt(prefix, txt) + finally: + self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, self.server_ip, self.get_dns_domain(), + name, None, add_rec_buf) + + prefix, txt = 'pad2textrec', ['"This is a test"', '', '', 'more text'] + prefix = 'rpc' + prefix + name = "%s.%s" % (prefix, self.get_dns_domain()) + + rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"\\"This is a test\\"" "" "" "more text"') + add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() + add_rec_buf.rec = rec + try: + self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, self.server_ip, self.get_dns_domain(), + name, add_rec_buf, None) + + self.check_query_txt(prefix, txt) + finally: + self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, self.server_ip, self.get_dns_domain(), + name, None, add_rec_buf) + + prefix, txt = 'pad3textrec', ['', '', '"This is a test"'] + prefix = 'rpc' + prefix + name = "%s.%s" % (prefix, self.get_dns_domain()) + + rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"" "" "\\"This is a test\\""') + add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() + add_rec_buf.rec = rec + try: + self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, self.server_ip, self.get_dns_domain(), + name, add_rec_buf, None) + + self.check_query_txt(prefix, txt) + finally: + self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, self.server_ip, self.get_dns_domain(), + name, None, add_rec_buf) + # Test is incomplete due to strlen against txt records def test_update_add_null_char_txt_record(self): "test adding records works" @@ -1003,7 +1078,7 @@ class TestRPCRoundtrip(DNSTest): response = self.dns_transaction_udp(p) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) self.check_query_txt(prefix, ['NULL']) - self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server, + self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip, self.get_dns_domain(), "%s.%s" % (prefix, self.get_dns_domain()), dnsp.DNS_TYPE_TXT, '"NULL"')) @@ -1013,11 +1088,30 @@ class TestRPCRoundtrip(DNSTest): response = self.dns_transaction_udp(p) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) self.check_query_txt(prefix, ['NULL', 'NULL']) - self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server, + self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip, self.get_dns_domain(), "%s.%s" % (prefix, self.get_dns_domain()), dnsp.DNS_TYPE_TXT, '"NULL" "NULL"')) + def test_update_add_null_char_rpc_to_dns(self): + prefix, txt = 'nulltextrec', ['NULL\x00BYTE'] + prefix = 'rpc' + prefix + name = "%s.%s" % (prefix, self.get_dns_domain()) + + rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"NULL"') + add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() + add_rec_buf.rec = rec + try: + self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, self.server_ip, self.get_dns_domain(), + name, add_rec_buf, None) + + self.check_query_txt(prefix, ['NULL']) + finally: + self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, self.server_ip, self.get_dns_domain(), + name, None, add_rec_buf) + def test_update_add_hex_char_txt_record(self): "test adding records works" prefix, txt = 'hextextrec', ['HIGH\xFFBYTE'] @@ -1025,11 +1119,30 @@ class TestRPCRoundtrip(DNSTest): response = self.dns_transaction_udp(p) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) self.check_query_txt(prefix, txt) - self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server, + self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip, self.get_dns_domain(), "%s.%s" % (prefix, self.get_dns_domain()), dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"')) + def test_update_add_hex_rpc_to_dns(self): + prefix, txt = 'hextextrec', ['HIGH\xFFBYTE'] + prefix = 'rpc' + prefix + name = "%s.%s" % (prefix, self.get_dns_domain()) + + rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"HIGH\xFFBYTE"') + add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() + add_rec_buf.rec = rec + try: + self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, self.server_ip, self.get_dns_domain(), + name, add_rec_buf, None) + + self.check_query_txt(prefix, txt) + finally: + self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, self.server_ip, self.get_dns_domain(), + name, None, add_rec_buf) + def test_update_add_slash_txt_record(self): "test adding records works" prefix, txt = 'slashtextrec', ['Th\\=is=is a test'] @@ -1037,11 +1150,33 @@ class TestRPCRoundtrip(DNSTest): response = self.dns_transaction_udp(p) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) self.check_query_txt(prefix, txt) - self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server, + self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip, self.get_dns_domain(), "%s.%s" % (prefix, self.get_dns_domain()), dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"')) + # This test fails against Windows as it eliminates slashes in RPC + # One typical use for a slash is in records like 'var=value' to + # escape '=' characters. + def test_update_add_slash_rpc_to_dns(self): + prefix, txt = 'slashtextrec', ['Th\\=is=is a test'] + prefix = 'rpc' + prefix + name = "%s.%s" % (prefix, self.get_dns_domain()) + + rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '"Th\\\\=is=is a test"') + add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() + add_rec_buf.rec = rec + try: + self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, self.server_ip, self.get_dns_domain(), + name, add_rec_buf, None) + + self.check_query_txt(prefix, txt) + finally: + self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, self.server_ip, self.get_dns_domain(), + name, None, add_rec_buf) + def test_update_add_two_txt_records(self): "test adding two txt records works" prefix, txt = 'textrec2', ['"This is a test"', @@ -1050,12 +1185,34 @@ class TestRPCRoundtrip(DNSTest): response = self.dns_transaction_udp(p) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) self.check_query_txt(prefix, txt) - self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server, + self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip, self.get_dns_domain(), "%s.%s" % (prefix, self.get_dns_domain()), dnsp.DNS_TYPE_TXT, '"\\"This is a test\\""' + ' "\\"and this is a test, too\\""')) + def test_update_add_two_rpc_to_dns(self): + prefix, txt = 'textrec2', ['"This is a test"', + '"and this is a test, too"'] + prefix = 'rpc' + prefix + name = "%s.%s" % (prefix, self.get_dns_domain()) + + rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, + '"\\"This is a test\\""' + + ' "\\"and this is a test, too\\""') + add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() + add_rec_buf.rec = rec + try: + self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, self.server_ip, self.get_dns_domain(), + name, add_rec_buf, None) + + self.check_query_txt(prefix, txt) + finally: + self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, self.server_ip, self.get_dns_domain(), + name, None, add_rec_buf) + def test_update_add_empty_txt_records(self): "test adding two txt records works" prefix, txt = 'emptytextrec', [] @@ -1063,11 +1220,30 @@ class TestRPCRoundtrip(DNSTest): response = self.dns_transaction_udp(p) self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK) self.check_query_txt(prefix, txt) - self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server, + self.assertIsNotNone(dns_record_match(self.rpc_conn, self.server_ip, self.get_dns_domain(), "%s.%s" % (prefix, self.get_dns_domain()), dnsp.DNS_TYPE_TXT, '')) + def test_update_add_empty_rpc_to_dns(self): + prefix, txt = 'rpcemptytextrec', [] + + name = "%s.%s" % (prefix, self.get_dns_domain()) + + rec = data_to_dns_record(dnsp.DNS_TYPE_TXT, '') + add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF() + add_rec_buf.rec = rec + try: + self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, self.server_ip, self.get_dns_domain(), + name, add_rec_buf, None) + + self.check_query_txt(prefix, txt) + finally: + self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, + 0, self.server_ip, self.get_dns_domain(), + name, None, add_rec_buf) + if __name__ == "__main__": import unittest unittest.main() diff --git a/source4/selftest/tests.py b/source4/selftest/tests.py index e284c2ca1fa..8f4976ca878 100755 --- a/source4/selftest/tests.py +++ b/source4/selftest/tests.py @@ -291,6 +291,7 @@ for f in sorted(os.listdir(os.path.join(samba4srcdir, "../pidl/tests"))): # DNS tests planpythontestsuite("fl2003dc:local", "samba.tests.dns") + for t in smbtorture4_testsuites("dns_internal."): plansmbtorture4testsuite(t, "ad_dc_ntvfs:local", '//$SERVER/whavever')