mirror of
https://github.com/samba-team/samba.git
synced 2025-12-20 16:23:51 +03:00
selftest: Add test confirming join-created DNS entries can be modified as the DC
This ensures that samba_dnsupdate can run in the long term against the new DNS entries Signed-off-by: Andrew Bartlett <abartlet@samba.org> Reviewed-by: Garming Sam <garming@catalyst.net.nz>
This commit is contained in:
@@ -20,9 +20,10 @@ import samba
|
||||
import sys
|
||||
import shutil
|
||||
import os
|
||||
from samba.tests.dns_base import DNSTest
|
||||
from samba.tests.dns_base import DNSTKeyTest
|
||||
from samba.join import dc_join
|
||||
from samba.dcerpc import drsuapi, misc, dns
|
||||
from samba.credentials import Credentials
|
||||
|
||||
def get_logger(name="subunit"):
|
||||
"""Get a logger object."""
|
||||
@@ -31,11 +32,11 @@ def get_logger(name="subunit"):
|
||||
logger.addHandler(logging.StreamHandler(sys.stderr))
|
||||
return logger
|
||||
|
||||
class JoinTestCase(DNSTest):
|
||||
class JoinTestCase(DNSTKeyTest):
|
||||
def setUp(self):
|
||||
super(JoinTestCase, self).setUp()
|
||||
self.server = samba.tests.env_get_var_value("SERVER")
|
||||
self.server_ip = samba.tests.env_get_var_value("SERVER_IP")
|
||||
super(JoinTestCase, self).setUp()
|
||||
self.lp = samba.tests.env_loadparm()
|
||||
self.creds = self.get_credentials()
|
||||
self.netbios_name = "jointest1"
|
||||
@@ -58,6 +59,8 @@ class JoinTestCase(DNSTest):
|
||||
|
||||
self.join_ctx.force_all_ips = True
|
||||
|
||||
self.join_ctx.do_join()
|
||||
|
||||
def tearDown(self):
|
||||
try:
|
||||
paths = self.join_ctx.paths
|
||||
@@ -76,9 +79,7 @@ class JoinTestCase(DNSTest):
|
||||
super(JoinTestCase, self).tearDown()
|
||||
|
||||
|
||||
def test_join(self):
|
||||
|
||||
self.join_ctx.do_join()
|
||||
def test_join_makes_records(self):
|
||||
|
||||
"create a query packet containing one query record via TCP"
|
||||
p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
|
||||
@@ -111,3 +112,64 @@ class JoinTestCase(DNSTest):
|
||||
self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
|
||||
self.assertEquals(response.answers[0].rdata, self.join_ctx.dnshostname)
|
||||
self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_A)
|
||||
|
||||
|
||||
def test_join_records_can_update(self):
|
||||
dc_creds = Credentials()
|
||||
dc_creds.guess(self.join_ctx.lp)
|
||||
dc_creds.set_machine_account(self.join_ctx.lp)
|
||||
|
||||
self.tkey_trans(creds=dc_creds)
|
||||
|
||||
p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
|
||||
q = self.make_name_question(self.join_ctx.dnsdomain,
|
||||
dns.DNS_QTYPE_SOA,
|
||||
dns.DNS_QCLASS_IN)
|
||||
questions = []
|
||||
questions.append(q)
|
||||
self.finish_name_packet(p, questions)
|
||||
|
||||
updates = []
|
||||
# Delete the old expected IPs
|
||||
IPs = samba.interface_ips(self.lp)
|
||||
for IP in IPs[1:]:
|
||||
if ":" in IP:
|
||||
r = dns.res_rec()
|
||||
r.name = self.join_ctx.dnshostname
|
||||
r.rr_type = dns.DNS_QTYPE_AAAA
|
||||
r.rr_class = dns.DNS_QCLASS_NONE
|
||||
r.ttl = 0
|
||||
r.length = 0xffff
|
||||
rdata = IP
|
||||
else:
|
||||
r = dns.res_rec()
|
||||
r.name = self.join_ctx.dnshostname
|
||||
r.rr_type = dns.DNS_QTYPE_A
|
||||
r.rr_class = dns.DNS_QCLASS_NONE
|
||||
r.ttl = 0
|
||||
r.length = 0xffff
|
||||
rdata = IP
|
||||
|
||||
r.rdata = rdata
|
||||
updates.append(r)
|
||||
|
||||
p.nscount = len(updates)
|
||||
p.nsrecs = updates
|
||||
|
||||
mac = self.sign_packet(p, self.key_name)
|
||||
(response, response_p) = self.dns_transaction_udp(p, self.server_ip)
|
||||
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
|
||||
self.verify_packet(response, response_p, mac)
|
||||
|
||||
p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
|
||||
questions = []
|
||||
|
||||
name = self.join_ctx.dnshostname
|
||||
q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
|
||||
questions.append(q)
|
||||
|
||||
self.finish_name_packet(p, questions)
|
||||
(response, response_packet) = self.dns_transaction_tcp(p, host=self.server_ip)
|
||||
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
|
||||
self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
|
||||
self.assertEquals(response.ancount, 1)
|
||||
|
||||
Reference in New Issue
Block a user