1
0
mirror of https://github.com/samba-team/samba.git synced 2025-01-25 06:04:04 +03:00

dns scavenging: Add extra tests for custom filter

Add extra tests for the custom ldb filter used by the dns scavenging
code.

Signed-off-by: Gary Lockyer <gary@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>

Autobuild-User(master): Andrew Bartlett <abartlet@samba.org>
Autobuild-Date(master): Mon Aug  6 05:36:43 CEST 2018 on sn-devel-144
This commit is contained in:
Gary Lockyer 2018-08-02 14:52:16 +12:00 committed by Andrew Bartlett
parent 97702ffc1e
commit dea788e521
2 changed files with 126 additions and 9 deletions

View File

@ -22,6 +22,7 @@ from samba.ndr import ndr_unpack, ndr_pack
from samba.samdb import SamDB
from samba.auth import system_session
import ldb
from ldb import ERR_OPERATIONS_ERROR
import os
import sys
import struct
@ -35,6 +36,8 @@ from samba import werror, WERRORError
from samba.tests.dns_base import DNSTest
import samba.getopt as options
import optparse
import samba.dcerpc.dnsp
parser = optparse.OptionParser("dns.py <server name> <server ip> [options]")
sambaopts = options.SambaOptions(parser)
@ -941,7 +944,6 @@ class TestZones(DNSTest):
lp=self.get_loadparm(),
session_info=system_session(),
credentials=self.creds)
self.zone_dn = "DC=" + self.zone +\
",CN=MicrosoftDNS,DC=DomainDNSZones," +\
str(self.samdb.get_default_basedn())
@ -1018,6 +1020,15 @@ class TestZones(DNSTest):
self.assertEqual(len(recs), 1)
return recs[0]
def dns_tombstone(self, prefix, txt, zone):
name = prefix + "." + zone
to = dnsp.DnssrvRpcRecord()
to.dwTimeStamp = 1000
to.wType = dnsp.DNS_TYPE_TOMBSTONE
self.samdb.dns_replace(name, [to])
def ldap_get_records(self, name):
# The use of SCOPE_SUBTREE here avoids raising an exception in the
# 0 results case for a test below.
@ -1206,19 +1217,22 @@ class TestZones(DNSTest):
name, txt = 'agingtest', ['test txt']
name2, txt2 = 'agingtest2', ['test txt2']
name3, txt3 = 'agingtest3', ['test txt3']
name4, txt4 = 'agingtest4', ['test txt4']
name5, txt5 = 'agingtest5', ['test txt5']
self.create_zone(self.zone, aging_enabled=True)
interval = 10
self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
Aging=1, zone=self.zone,
AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
self.dns_update_record(name, txt),
self.dns_update_record(name, txt)
self.dns_update_record(name2, txt),
self.dns_update_record(name2, txt2),
self.dns_update_record(name2, txt)
self.dns_update_record(name2, txt2)
self.dns_update_record(name3, txt),
self.dns_update_record(name3, txt2),
self.dns_update_record(name3, txt)
self.dns_update_record(name3, txt2)
last_update = self.dns_update_record(name3, txt3)
# Modify txt1 of the first 2 names
@ -1228,6 +1242,22 @@ class TestZones(DNSTest):
self.ldap_modify_dnsrecs(name, mod_ts)
self.ldap_modify_dnsrecs(name2, mod_ts)
# create a static dns record.
rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
rec_buf.rec = TXTRecord(txt4)
self.rpc_conn.DnssrvUpdateRecord2(
dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0,
self.server_ip,
self.zone,
name4,
rec_buf,
None)
# Create a tomb stoned record.
self.dns_update_record(name5, txt5)
self.dns_tombstone(name5, txt5, self.zone)
self.ldap_get_dns_records(name3)
expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={})"
expr = expr.format(int(last_update.dwTimeStamp) - 1)
@ -1239,15 +1269,101 @@ class TestZones(DNSTest):
updated_names = {str(r.get('name')) for r in res}
self.assertEqual(updated_names, set([name, name2]))
def test_dns_tombstone_custom_match_rule_fail(self):
self.create_zone(self.zone, aging_enabled=True)
def test_dns_tombstone_custom_match_rule_no_records(self):
lp = self.get_loadparm()
self.samdb = SamDB(url=lp.samdb_url(), lp=lp,
session_info=system_session(),
credentials=self.creds)
# The check here is that this does not blow up on silly input
expr = "(dnsProperty:1.3.6.1.4.1.7165.4.5.3:=1)"
self.create_zone(self.zone, aging_enabled=True)
interval = 10
self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
Aging=1, zone=self.zone,
AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={})"
expr = expr.format(1)
try:
res = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
expression=expr, attrs=["*"])
except ldb.LdbError as e:
self.fail(str(e))
self.assertEqual(0, len(res))
def test_dns_tombstone_custom_match_rule_fail(self):
self.create_zone(self.zone, aging_enabled=True)
samdb = SamDB(url=lp.samdb_url(),
lp=lp,
session_info=system_session(),
credentials=self.creds)
# Property name in not dnsRecord
expr = "(dnsProperty:1.3.6.1.4.1.7165.4.5.3:=1)"
res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
expression=expr, attrs=["*"])
self.assertEquals(len(res), 0)
# No value for tombstone time
try:
expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=)"
res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
expression=expr, attrs=["*"])
self.assertEquals(len(res), 0)
self.fail("Exception: ldb.ldbError not generated")
except ldb.LdbError as e:
(num, msg) = e.args
self.assertEquals(num, ERR_OPERATIONS_ERROR)
# Tombstone time = -
try:
expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=-)"
res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
expression=expr, attrs=["*"])
self.assertEquals(len(res), 0)
self.fail("Exception: ldb.ldbError not generated")
except ldb.LdbError as e:
(num, _) = e.args
self.assertEquals(num, ERR_OPERATIONS_ERROR)
# Tombstone time longer than 64 characters
try:
expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:={})"
expr = expr.format("1" * 65)
res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
expression=expr, attrs=["*"])
self.assertEquals(len(res), 0)
self.fail("Exception: ldb.ldbError not generated")
except ldb.LdbError as e:
(num, _) = e.args
self.assertEquals(num, ERR_OPERATIONS_ERROR)
# Non numeric Tombstone time
try:
expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=expired)"
res = samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
expression=expr, attrs=["*"])
self.assertEquals(len(res), 0)
self.fail("Exception: ldb.ldbError not generated")
except ldb.LdbError as e:
(num, _) = e.args
self.assertEquals(num, ERR_OPERATIONS_ERROR)
# Non system session
try:
db = SamDB(url="ldap://" + self.server_ip,
lp=self.get_loadparm(),
credentials=self.creds)
expr = "(dnsRecord:1.3.6.1.4.1.7165.4.5.3:=2)"
res = db.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE,
expression=expr, attrs=["*"])
self.assertEquals(len(res), 0)
self.fail("Exception: ldb.ldbError not generated")
except ldb.LdbError as e:
(num, _) = e.args
self.assertEquals(num, ERR_OPERATIONS_ERROR)
def test_basic_scavenging(self):
lp = self.get_loadparm()
self.samdb = SamDB(url=lp.samdb_url(), lp=lp,

View File

@ -45,6 +45,7 @@ samba.tests.dns.__main__.TestZones.test_aging_refresh\(rodc:local\)
samba.tests.dns.__main__.TestZones.test_rpc_add_no_timestamp\(rodc:local\)
samba.tests.dns.__main__.TestZones.test_basic_scavenging\(rodc:local\)
samba.tests.dns.__main__.TestZones.test_dns_tombstone_custom_match_rule\(rodc:local\)
samba.tests.dns.__main__.TestZones.test_dns_tombstone_custom_match_rule_no_records\(rodc:local\)
samba.tests.dns.__main__.TestZones.test_dns_tombstone_custom_match_rule_fail\(rodc:local\)
samba.tests.dns.__main__.TestZones.test_dynamic_record_static_update\(rodc:local\)
samba.tests.dns.__main__.TestZones.test_static_record_dynamic_update\(rodc:local\)