1
0
mirror of https://github.com/samba-team/samba.git synced 2024-12-22 13:34:15 +03:00

dns: record aging tests

First basic DNS record aging tests.  These check that we can
turn aging on and off, and that timestamps are written on DNS
add and update calls, but not RPC calls.

BUG: https://bugzilla.samba.org/show_bug.cgi?id=10812

Signed-off-by: Aaron Haslett <aaronhaslett@catalyst.net.nz>
Reviewed-by: Gary Lockyer <gary@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
This commit is contained in:
Aaron Haslett 2018-05-09 18:02:28 +12:00 committed by Andrew Bartlett
parent d871e0c84c
commit c1552c70c5
4 changed files with 236 additions and 14 deletions

View File

@ -16,6 +16,12 @@
#
from __future__ import print_function
from samba import dsdb
from samba.ndr import ndr_unpack, ndr_pack
from samba.samdb import SamDB
from samba.auth import system_session
import ldb
import os
import sys
import struct
@ -652,11 +658,9 @@ class TestComplexQueries(DNSTest):
r.rr_class = dns.DNS_QCLASS_IN
r.ttl = 900
r.length = 0xffff
rdata = value
r.rdata = rdata
updates = [r]
r.rdata = value
p.nscount = 1
p.nsrecs = updates
p.nsrecs = [r]
(response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
@ -884,11 +888,22 @@ class TestZones(DNSTest):
self.timeout = timeout
self.zone = "test.lan"
self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server_ip),
self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" %\
(self.server_ip),
self.lp, self.creds)
self.samdb = SamDB(url="ldap://" + self.server_ip,
lp = self.get_loadparm(),
session_info=system_session(),
credentials=self.creds)
self.zone_dn = "DC=" + self.zone +\
",CN=MicrosoftDNS,DC=DomainDNSZones," +\
str(self.samdb.get_default_basedn())
def tearDown(self):
super(TestZones, self).tearDown()
try:
self.delete_zone(self.zone)
except RuntimeError as e:
@ -896,15 +911,18 @@ class TestZones(DNSTest):
if num != werror.WERR_DNS_ERROR_ZONE_DOES_NOT_EXIST:
raise
def create_zone(self, zone):
def create_zone(self, zone, aging_enabled=False):
zone_create = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
zone_create.pszZoneName = zone
zone_create.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_SECURE
zone_create.fAging = 0
zone_create.fAging = int(aging_enabled)
zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
zone_create.fDsIntegrated = 1
zone_create.fLoadExisting = 1
zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE
try:
self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
self.rpc_conn.DnssrvOperation2(client_version,
0,
self.server_ip,
None,
@ -915,6 +933,182 @@ class TestZones(DNSTest):
except WERRORError as e:
self.fail(str(e))
def set_params(self, **kwargs):
zone = kwargs.pop('zone', None)
for key,val in kwargs.items():
name_param = dnsserver.DNS_RPC_NAME_AND_PARAM()
name_param.dwParam = val
name_param.pszNodeName = key
client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
nap_type = dnsserver.DNSSRV_TYPEID_NAME_AND_PARAM
try:
self.rpc_conn.DnssrvOperation2(client_version, 0, self.server, zone,
0, 'ResetDwordProperty', nap_type,
name_param)
except WERRORError as e:
self.fail(str(e))
def ldap_modify_dnsrecs(self, name, func):
dn = 'DC={},{}'.format(name, self.zone_dn)
dns_recs = self.ldap_get_dns_records(name)
for rec in dns_recs:
func(rec)
update_dict = {'dn':dn, 'dnsRecord':[ndr_pack(r) for r in dns_recs]}
self.samdb.modify(ldb.Message.from_dict(self.samdb,
update_dict,
ldb.FLAG_MOD_REPLACE))
def dns_update_record(self, prefix, txt):
p = self.make_txt_update(prefix, txt, self.zone)
(code, response) = self.dns_transaction_udp(p, host=self.server_ip)
self.assert_dns_rcode_equals(code, dns.DNS_RCODE_OK)
recs = self.ldap_get_dns_records(prefix)
recs = [r for r in recs if r.data.str == txt]
self.assertEqual(len(recs), 1)
return recs[0]
def ldap_get_records(self, name):
dn = 'DC={},{}'.format(name, self.zone_dn)
expr = "(&(objectClass=dnsNode)(name={}))".format(name)
return self.samdb.search(base=dn, scope=ldb.SCOPE_SUBTREE,
expression=expr, attrs=["*"])
def ldap_get_dns_records(self, name):
records = self.ldap_get_records(name)
return [ndr_unpack(dnsp.DnssrvRpcRecord, r)
for r in records[0].get('dnsRecord')]
def ldap_get_zone_settings(self):
records = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_BASE,
expression="(&(objectClass=dnsZone)"+\
"(name={}))".format(self.zone),
attrs=["dNSProperty"])
self.assertEqual(len(records), 1)
props = [ndr_unpack(dnsp.DnsProperty, r)
for r in records[0].get('dNSProperty')]
#We have no choice but to repeat these here.
zone_prop_ids = {0x00: "EMPTY",
0x01: "TYPE",
0x02: "ALLOW_UPDATE",
0x08: "SECURE_TIME",
0x10: "NOREFRESH_INTERVAL",
0x11: "SCAVENGING_SERVERS",
0x12: "AGING_ENABLED_TIME",
0x20: "REFRESH_INTERVAL",
0x40: "AGING_STATE",
0x80: "DELETED_FROM_HOSTNAME",
0x81: "MASTER_SERVERS",
0x82: "AUTO_NS_SERVERS",
0x83: "DCPROMO_CONVERT",
0x90: "SCAVENGING_SERVERS_DA",
0x91: "MASTER_SERVERS_DA",
0x92: "NS_SERVERS_DA",
0x100: "NODE_DBFLAGS"}
return {zone_prop_ids[p.id].lower(): p.data for p in props}
def set_aging(self, enable=False):
self.create_zone(self.zone, aging_enabled=enable)
self.set_params(NoRefreshInterval=1, RefreshInterval=1,
Aging=int(bool(enable)), zone=self.zone,
AllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE)
def test_set_aging(self, enable=True, name='agingtest', txt=['test txt']):
self.set_aging(enable=True)
settings = self.ldap_get_zone_settings()
self.assertTrue(settings['aging_state'] is not None)
self.assertTrue(settings['aging_state'])
rec = self.dns_update_record('agingtest', ['test txt'])
self.assertNotEqual(rec.dwTimeStamp, 0)
def test_set_aging_disabled(self):
self.set_aging(enable=False)
settings = self.ldap_get_zone_settings()
self.assertTrue(settings['aging_state'] is not None)
self.assertFalse(settings['aging_state'])
rec = self.dns_update_record('agingtest', ['test txt'])
self.assertNotEqual(rec.dwTimeStamp, 0)
def test_aging_update(self, enable=True):
name, txt = 'agingtest', ['test txt']
self.set_aging(enable=True)
before_mod = self.dns_update_record(name, txt)
if not enable:
self.set_params(zone=self.zone, Aging=0)
dec = 2
def mod_ts(rec):
rec.dwTimeStamp -= dec
self.ldap_modify_dnsrecs(name, mod_ts)
after_mod = self.ldap_get_dns_records(name)
self.assertEqual(len(after_mod), 1)
after_mod = after_mod[0]
self.assertEqual(after_mod.dwTimeStamp,
before_mod.dwTimeStamp - dec)
after_update = self.dns_update_record(name, txt)
after_should_equal = before_mod if enable else after_mod
self.assertEqual(after_should_equal.dwTimeStamp,
after_update.dwTimeStamp)
def test_aging_update_disabled(self):
self.test_aging_update(enable=False)
def test_aging_refresh(self):
name,txt = 'agingtest', ['test txt']
self.create_zone(self.zone, aging_enabled=True)
interval = 10
self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
Aging=1, zone=self.zone,
AllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE)
before_mod = self.dns_update_record(name, txt)
def mod_ts(rec):
rec.dwTimeStamp -= interval/2
self.ldap_modify_dnsrecs(name, mod_ts)
update_during_norefresh = self.dns_update_record(name, txt)
def mod_ts(rec):
rec.dwTimeStamp -= interval + interval/2
self.ldap_modify_dnsrecs(name, mod_ts)
update_during_refresh = self.dns_update_record(name, txt)
self.assertEqual(update_during_norefresh.dwTimeStamp,
before_mod.dwTimeStamp - interval/2)
self.assertEqual(update_during_refresh.dwTimeStamp,
before_mod.dwTimeStamp)
def test_rpc_add_no_timestamp(self):
name,txt = 'agingtest', ['test txt']
self.set_aging(enable=True)
rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
rec_buf.rec = TXTRecord(txt)
self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0, self.server_ip,
self.zone, name, rec_buf, None)
recs = self.ldap_get_dns_records(name)
self.assertEqual(len(recs), 1)
self.assertEqual(recs[0].dwTimeStamp, 0)
def test_basic_scavenging(self):
self.create_zone(self.zone, aging_enabled=True)
interval = 1
self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
zone=self.zone, Aging=1,
AllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE)
name, txt = 'agingtest', ['test txt']
rec = self.dns_update_record(name,txt)
rec = self.dns_update_record(name+'2',txt)
def mod_ts(rec):
rec.dwTimeStamp -= interval*5
self.ldap_modify_dnsrecs(name, mod_ts)
dsdb._scavenge_dns_records(self.samdb)
recs = self.ldap_get_dns_records(name)
self.assertEqual(len(recs), 1)
self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TOMBSTONE)
recs = self.ldap_get_dns_records(name+'2')
self.assertEqual(len(recs), 1)
self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TXT)
def delete_zone(self, zone):
self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0,

View File

@ -166,18 +166,18 @@ class DNSTest(TestCaseInTempDir):
return (response, recv_packet[2:])
def make_txt_update(self, prefix, txt_array):
def make_txt_update(self, prefix, txt_array, domain=None):
p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
updates = []
name = self.get_dns_domain()
name = domain or self.get_dns_domain()
u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
updates.append(u)
self.finish_name_packet(p, updates)
updates = []
r = dns.res_rec()
r.name = "%s.%s" % (prefix, self.get_dns_domain())
r.name = "%s.%s" % (prefix, name)
r.rr_type = dns.DNS_QTYPE_TXT
r.rr_class = dns.DNS_QCLASS_IN
r.ttl = 900
@ -190,8 +190,8 @@ class DNSTest(TestCaseInTempDir):
return p
def check_query_txt(self, prefix, txt_array):
name = "%s.%s" % (prefix, self.get_dns_domain())
def check_query_txt(self, prefix, txt_array, zone=None):
name = "%s.%s" % (prefix, zone or self.get_dns_domain())
p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
questions = []

View File

@ -33,7 +33,23 @@ samba.tests.dns.__main__.TestRPCRoundtrip.test_update_add_padding_rpc_to_dns\(ro
samba.tests.dns.__main__.TestRPCRoundtrip.test_update_add_slash_rpc_to_dns\(rodc:local\)
samba.tests.dns.__main__.TestRPCRoundtrip.test_update_add_two_rpc_to_dns\(rodc:local\)
samba.tests.dns.__main__.TestRPCRoundtrip.test_update_add_txt_rpc_to_dns\(rodc:local\)
samba.tests.dns.__main__.TestZones.test_set_aging_disabled
samba.tests.dns.__main__.TestZones.test_soa_query\(rodc:local\)
samba.tests.dns.__main__.TestZones.test_set_aging\(rodc:local\)
samba.tests.dns.__main__.TestZones.test_aging_update\(rodc:local\)
samba.tests.dns.__main__.TestZones.test_aging_update_disabled\(rodc:local\)
samba.tests.dns.__main__.TestZones.test_aging_refresh\(rodc:local\)
samba.tests.dns.__main__.TestZones.test_rpc_add_no_timestamp\(rodc:local\)
samba.tests.dns.__main__.TestZones.test_basic_scavenging\(rodc:local\)
samba.tests.dns.__main__.TestZones.test_set_aging\(vampire_dc:local\)
samba.tests.dns.__main__.TestZones.test_aging_update\(vampire_dc:local\)
samba.tests.dns.__main__.TestZones.test_aging_update_disabled\(vampire_dc:local\)
samba.tests.dns.__main__.TestZones.test_aging_refresh\(vampire_dc:local\)
samba.tests.dns.__main__.TestZones.test_basic_scavenging\(vampire_dc:local\)
samba.tests.dns.__main__.TestComplexQueries.test_cname_two_chain\(vampire_dc:local\)
samba.tests.dns.__main__.TestComplexQueries.test_one_a_query\(vampire_dc:local\)
samba.tests.dns.__main__.TestSimpleQueries.test_one_a_query\(vampire_dc:local\)

View File

@ -0,0 +1,12 @@
#
# Tests added for the dns scavenging changes
#
# Will be removed once the tests are implemented.
#
samba.tests.dns.__main__.TestZones.test_aging_refresh\(fl2003dc:local\)
samba.tests.dns.__main__.TestZones.test_aging_update\(fl2003dc:local\)
samba.tests.dns.__main__.TestZones.test_aging_update_disabled\(fl2003dc:local\)
samba.tests.dns.__main__.TestZones.test_basic_scavenging\(fl2003dc:local\)
samba.tests.dns.__main__.TestZones.test_rpc_add_no_timestamp\(fl2003dc:local\)
samba.tests.dns.__main__.TestZones.test_set_aging\(fl2003dc:local\)
samba.tests.dns.__main__.TestZones.test_rpc_add_no_timestamp\(vampire_dc:local\)