1
0
mirror of https://github.com/samba-team/samba.git synced 2024-12-22 13:34:15 +03:00

python/tests: expand tests for dns server over rpc

Signed-off-by: Bob Campbell <bobcampbell@catalyst.net.nz>
Reviewed-by: Garming Sam <garming@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
This commit is contained in:
Bob Campbell 2016-11-30 09:19:31 +13:00 committed by Garming Sam
parent b9c99a3483
commit 64a3825765
2 changed files with 656 additions and 71 deletions

View File

@ -17,20 +17,642 @@
"""Tests for samba.dcerpc.dnsserver"""
import os
import ldb
from samba.auth import system_session
from samba.samdb import SamDB
from samba.ndr import ndr_unpack, ndr_pack
from samba.dcerpc import dnsp, dnsserver
from samba.tests import RpcInterfaceTestCase, env_get_var_value
from samba.netcmd.dns import ARecord, NSRecord
from samba.netcmd.dns import ARecord, AAAARecord, PTRRecord, CNameRecord, NSRecord, MXRecord, SRVRecord, TXTRecord
class DnsserverTests(RpcInterfaceTestCase):
@classmethod
def setUpClass(cls):
good_dns = ["SAMDOM.EXAMPLE.COM",
"1.EXAMPLE.COM",
"%sEXAMPLE.COM" % ("1."*100),
"EXAMPLE",
"\n.COM",
"!@#$%^&*()_",
"HIGH\xFFBYTE",
"@.EXAMPLE.COM",
"."]
bad_dns = ["...",
".EXAMPLE.COM",
".EXAMPLE.",
"",
"SAMDOM..EXAMPLE.COM"]
good_mx = ["SAMDOM.EXAMPLE.COM 65535"]
bad_mx = []
good_srv = ["SAMDOM.EXAMPLE.COM 65535 65535 65535"]
bad_srv = []
for bad_dn in bad_dns:
bad_mx.append("%s 1" % bad_dn)
bad_srv.append("%s 0 0 0" % bad_dn)
for good_dn in good_dns:
good_mx.append("%s 1" % good_dn)
good_srv.append("%s 0 0 0" % good_dn)
cls.good_records = {
"A": ["192.168.0.1",
"255.255.255.255"],
"AAAA": ["1234:5678:9ABC:DEF0:0000:0000:0000:0000",
"0000:0000:0000:0000:0000:0000:0000:0000",
"1234:5678:9ABC:DEF0:1234:5678:9ABC:DEF0",
"1234:1234:1234::",
"1234:1234:1234:1234:1234::",
"1234:5678:9ABC:DEF0::",
"0000:0000::0000",
"1234::5678:9ABC:0000:0000:0000:0000",
"::1",
"::",
"1:1:1:1:1:1:1:1"],
"PTR": good_dns,
"CNAME": good_dns,
"NS": good_dns,
"MX": good_mx,
"SRV": good_srv,
"TXT": ["text", "", "@#!", "\n"]
}
cls.bad_records = {
"A": ["192.168.0.500",
"255.255.255.255/32"],
"AAAA": ["GGGG:1234:5678:9ABC:0000:0000:0000:0000",
"0000:0000:0000:0000:0000:0000:0000:0000/1",
"AAAA:AAAA:AAAA:AAAA:G000:0000:0000:1234",
"1234:5678:9ABC:DEF0:1234:5678:9ABC:DEF0:1234",
"1234:5678:9ABC:DEF0:1234:5678:9ABC",
"1111::1111::1111"],
"PTR": bad_dns,
"CNAME": bad_dns,
"NS": bad_dns,
"MX": bad_mx,
"SRV": bad_srv
}
# Because we use uint16_t for these numbers, we can't
# actually create these records.
invalid_mx = ["SAMDOM.EXAMPLE.COM -1",
"SAMDOM.EXAMPLE.COM 65536",
"%s 1" % "A"*256]
invalid_srv = ["SAMDOM.EXAMPLE.COM 0 65536 0",
"SAMDOM.EXAMPLE.COM 0 0 65536",
"SAMDOM.EXAMPLE.COM 65536 0 0"]
cls.invalid_records = {
"MX": invalid_mx,
"SRV": invalid_srv
}
def setUp(self):
super(DnsserverTests, self).setUp()
self.server = env_get_var_value("SERVER_IP")
self.server = os.environ["DC_SERVER"]
self.zone = env_get_var_value("REALM").lower()
self.conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server),
self.get_loadparm(),
self.get_credentials())
self.samdb = SamDB(url="ldap://%s" % os.environ["DC_SERVER_IP"],
lp = self.get_loadparm(),
session_info=system_session(),
credentials=self.get_credentials())
self.custom_zone = "zone"
zone_create_info = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
zone_create_info.pszZoneName = self.custom_zone
zone_create_info.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
zone_create_info.fAging = 0
zone_create_info.fDsIntegrated = 1
zone_create_info.fLoadExisting = 1
zone_create_info.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
self.conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0,
self.server,
None,
0,
'ZoneCreate',
dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
zone_create_info)
def tearDown(self):
self.conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0,
self.server,
self.custom_zone,
0,
'DeleteZoneFromDs',
dnsserver.DNSSRV_TYPEID_NULL,
None)
super(DnsserverTests, self).tearDown()
# This test fails against Samba (but passes against Windows),
# because Samba does not return the record when we enum records.
# Records can be given DNS_RANK_NONE when the zone they are in
# does not have DNS_ZONE_TYPE_PRIMARY. Since such records can be
# deleted, however, we do not consider this urgent to fix and
# so this test is a knownfail.
def test_rank_none(self):
"""
See what happens when we set a record's rank to
DNS_RANK_NONE.
"""
record_str = "192.168.50.50"
record_type_str = "A"
self.add_record(self.custom_zone, "testrecord", record_type_str, record_str)
dn, record = self.get_record_from_db(self.custom_zone, "testrecord")
record.rank = 0 # DNS_RANK_NONE
res = self.samdb.dns_replace_by_dn(dn, [record])
if res is not None:
self.fail("Unable to update dns record to have DNS_RANK_NONE.")
self.assert_num_records(self.custom_zone, "testrecord", record_type_str)
self.add_record(self.custom_zone, "testrecord", record_type_str, record_str, assertion=False)
self.delete_record(self.custom_zone, "testrecord", record_type_str, record_str)
self.assert_num_records(self.custom_zone, "testrecord", record_type_str, 0)
def test_dns_tombstoned(self):
"""
See what happens when we set a record to be tombstoned.
"""
record_str = "192.168.50.50"
record_type_str = "A"
self.add_record(self.custom_zone, "testrecord", record_type_str, record_str)
dn, record = self.get_record_from_db(self.custom_zone, "testrecord")
record.wType = dnsp.DNS_TYPE_TOMBSTONE
res = self.samdb.dns_replace_by_dn(dn, [record])
if res is not None:
self.fail("Unable to update dns record to be tombstoned.")
self.assert_num_records(self.custom_zone, "testrecord", record_type_str)
self.delete_record(self.custom_zone, "testrecord", record_type_str, record_str)
self.assert_num_records(self.custom_zone, "testrecord", record_type_str, 0)
def get_record_from_db(self, zone_name, record_name):
"""
Returns (dn of record, record)
"""
zones = self.samdb.search(base="DC=DomainDnsZones,%s" % self.samdb.get_default_basedn(), scope=ldb.SCOPE_SUBTREE,
expression="(objectClass=dnsZone)",
attrs=["cn"])
zone_dn = None
for zone in zones:
if zone_name in str(zone.dn):
zone_dn = zone.dn
break
if zone_dn is None:
raise AssertionError("Couldn't find zone '%s'." % zone_name)
records = self.samdb.search(base=zone_dn, scope=ldb.SCOPE_SUBTREE,
expression="(objectClass=dnsNode)",
attrs=["dnsRecord"])
for old_packed_record in records:
if record_name in str(old_packed_record.dn):
return (old_packed_record.dn, ndr_unpack(dnsp.DnssrvRpcRecord, old_packed_record["dnsRecord"][0]))
def test_duplicate_matching(self):
"""
Make sure that records which should be distinct from each other or duplicate
to each other behave as expected.
"""
distinct_dns = [("SAMDOM.EXAMPLE.COM",
"SAMDOM.EXAMPLE.CO",
"EXAMPLE.COM", "SAMDOM.EXAMPLE")]
duplicate_dns = [("SAMDOM.EXAMPLE.COM", "samdom.example.com", "SAMDOM.example.COM"),
("EXAMPLE.", "EXAMPLE")]
# Every tuple has entries which should be considered duplicate to one another.
duplicates = {
"AAAA": [("AAAA::", "aaaa::"),
("AAAA::", "AAAA:0000::"),
("AAAA::", "AAAA:0000:0000:0000:0000:0000:0000:0000"),
("AAAA::", "AAAA:0:0:0:0:0:0:0"),
("0123::", "123::"),
("::", "::0", "0000:0000:0000:0000:0000:0000:0000:0000")],
}
# Every tuple has entries which should be considered distinct from one another.
distinct = {
"A": [("192.168.1.0", "192.168.1.1", "192.168.2.0", "192.169.1.0", "193.168.1.0")],
"AAAA": [("AAAA::1234:5678:9ABC", "::AAAA:1234:5678:9ABC"),
("1000::", "::1000"),
("::1", "::11", "::1111"),
("1234::", "0234::")],
"SRV": [("SAMDOM.EXAMPLE.COM 1 1 1", "SAMDOM.EXAMPLE.COM 1 1 0", "SAMDOM.EXAMPLE.COM 1 0 1",
"SAMDOM.EXAMPLE.COM 0 1 1", "SAMDOM.EXAMPLE.COM 2 1 0", "SAMDOM.EXAMPLE.COM 2 2 2")],
"MX": [("SAMDOM.EXAMPLE.COM 1", "SAMDOM.EXAMPLE.COM 0")],
"TXT": [("A RECORD", "B RECORD", "a record")]
}
for record_type_str in ("PTR", "CNAME", "NS"):
distinct[record_type_str] = distinct_dns
duplicates[record_type_str] = duplicate_dns
for record_type_str in duplicates:
for duplicate_tuple in duplicates[record_type_str]:
# Attempt to add duplicates and make sure that all after the first fails
self.add_record(self.custom_zone, "testrecord", record_type_str, duplicate_tuple[0])
for record in duplicate_tuple:
self.add_record(self.custom_zone, "testrecord", record_type_str, record, assertion=False)
self.assert_num_records(self.custom_zone, "testrecord", record_type_str)
self.delete_record(self.custom_zone, "testrecord", record_type_str, duplicate_tuple[0])
# Repeatedly: add the first duplicate, and attempt to remove all of the others, making sure this succeeds
for record in duplicate_tuple:
self.add_record(self.custom_zone, "testrecord", record_type_str, duplicate_tuple[0])
self.delete_record(self.custom_zone, "testrecord", record_type_str, record)
for record_type_str in distinct:
for distinct_tuple in distinct[record_type_str]:
# Attempt to add distinct and make sure that they all succeed within a tuple
i = 0
for record in distinct_tuple:
i = i + 1
try:
self.add_record(self.custom_zone, "testrecord", record_type_str, record)
# All records should have been added.
self.assert_num_records(self.custom_zone, "testrecord", record_type_str, expected_num=i)
except AssertionError as e:
raise AssertionError("Failed to add %s, which should be distinct from all others in the set. "
"Original error: %s\nDistinct set: %s." % (record, e, distinct_tuple))
for record in distinct_tuple:
self.delete_record(self.custom_zone, "testrecord", record_type_str, record)
# CNAMEs should not have been added, since they conflict.
if record_type_str == 'CNAME':
continue
# Add the first distinct and attempt to remove all of the others, making sure this fails
# Windows fails this test. This is probably due to weird tombstoning behavior.
self.add_record(self.custom_zone, "testrecord", record_type_str, distinct_tuple[0])
for record in distinct_tuple:
if record == distinct_tuple[0]:
continue
try:
self.delete_record(self.custom_zone, "testrecord", record_type_str, record, assertion=False)
except AssertionError as e:
raise AssertionError("Managed to remove %s by attempting to remove %s. Original error: %s"
% (distinct_tuple[0], record, e))
self.delete_record(self.custom_zone, "testrecord", record_type_str, distinct_tuple[0])
def test_accept_valid_commands(self):
"""
Make sure that we can add, update and delete a variety
of valid records.
"""
for record_type_str in self.good_records:
for record_str in self.good_records[record_type_str]:
self.add_record(self.custom_zone, "testrecord", record_type_str, record_str)
self.assert_num_records(self.custom_zone, "testrecord", record_type_str)
self.delete_record(self.custom_zone, "testrecord", record_type_str, record_str)
def test_reject_invalid_commands(self):
"""
Make sure that we can't add a variety of invalid records,
and that we can't update valid records to invalid ones.
"""
num_failures = 0
for record_type_str in self.bad_records:
for record_str in self.bad_records[record_type_str]:
# Attempt to add the bad record, which should fail. Then, attempt to query for and delete
# it. Since it shouldn't exist, these should fail too.
try:
self.add_record(self.custom_zone, "testrecord", record_type_str, record_str, assertion=False)
self.assert_num_records(self.custom_zone, "testrecord", record_type_str, expected_num=0)
self.delete_record(self.custom_zone, "testrecord", record_type_str, record_str, assertion=False)
except AssertionError as e:
print e
num_failures = num_failures + 1
# Also try to update valid records to invalid ones, making sure this fails
for record_type_str in self.bad_records:
for record_str in self.bad_records[record_type_str]:
good_record_str = self.good_records[record_type_str][0]
self.add_record(self.custom_zone, "testrecord", record_type_str, good_record_str)
try:
self.add_record(self.custom_zone, "testrecord", record_type_str, record_str, assertion=False)
except AssertionError as e:
print e
num_failures = num_failures + 1
self.delete_record(self.custom_zone, "testrecord", record_type_str, good_record_str)
self.assertTrue(num_failures == 0, "Failed to reject invalid commands. Total failures: %d." % num_failures)
def test_add_duplicate_different_type(self):
"""
Attempt to add some values which have the same name as
existing ones, just a different type.
"""
num_failures = 0
for record_type_str_1 in self.good_records:
record1 = self.good_records[record_type_str_1][0]
self.add_record(self.custom_zone, "testrecord", record_type_str_1, record1)
for record_type_str_2 in self.good_records:
if record_type_str_1 == record_type_str_2:
continue
record2 = self.good_records[record_type_str_2][0]
has_a = record_type_str_1 == 'A' or record_type_str_2 == 'A'
has_aaaa = record_type_str_1 == 'AAAA' or record_type_str_2 == 'AAAA'
has_cname = record_type_str_1 == 'CNAME' or record_type_str_2 == 'CNAME'
has_ptr = record_type_str_1 == 'PTR' or record_type_str_2 == 'PTR'
has_mx = record_type_str_1 == 'MX' or record_type_str_2 == 'MX'
has_srv = record_type_str_1 == 'SRV' or record_type_str_2 == 'SRV'
has_txt = record_type_str_1 == 'TXT' or record_type_str_2 == 'TXT'
# If we attempt to add any record except A or AAAA when we already have an NS record,
# the add should fail.
add_error_ok = False
if record_type_str_1 == 'NS' and not has_a and not has_aaaa:
add_error_ok = True
# If we attempt to add a CNAME when an A, PTR or MX record exists, the add should fail.
if record_type_str_2 == 'CNAME' and (has_ptr or has_mx or has_a or has_aaaa):
add_error_ok = True
# If we have a CNAME, adding an A, AAAA, SRV or TXT record should fail.
# If we have an A, AAAA, SRV or TXT record, adding a CNAME should fail.
if has_cname and (has_a or has_aaaa or has_srv or has_txt):
add_error_ok = True
try:
self.add_record(self.custom_zone, "testrecord", record_type_str_2, record2)
if add_error_ok:
num_failures = num_failures + 1
print("Expected error when adding %s while a %s existed."
% (record_type_str_2, record_type_str_1))
except AssertionError as e:
if not add_error_ok:
num_failures = num_failures + 1
print("Didn't expect error when adding %s while a %s existed."
% (record_type_str_2, record_type_str_1))
if not add_error_ok:
# In the "normal" case, we expect the add to work and us to have one of each type of record afterwards.
expected_num_type_1 = 1
expected_num_type_2 = 1
# If we have an MX record, a PTR record should replace it when added.
# If we have a PTR record, an MX record should replace it when added.
if has_ptr and has_mx:
expected_num_type_1 = 0
# If we have a CNAME, SRV or TXT record, a PTR or MX record should replace it when added.
if (has_cname or has_srv or has_txt) and (record_type_str_2 == 'PTR' or record_type_str_2 == 'MX'):
expected_num_type_1 = 0
if (record_type_str_1 == 'NS' and (has_a or has_aaaa)):
expected_num_type_2 = 0
try:
self.assert_num_records(self.custom_zone, "testrecord", record_type_str_1, expected_num=expected_num_type_1)
except AssertionError as e:
num_failures = num_failures + 1
print("Expected %s %s records after adding a %s record and a %s record already existed."
% (expected_num_type_1, record_type_str_1, record_type_str_2, record_type_str_1))
try:
self.assert_num_records(self.custom_zone, "testrecord", record_type_str_2, expected_num=expected_num_type_2)
except AssertionError as e:
num_failures = num_failures + 1
print("Expected %s %s records after adding a %s record and a %s record already existed."
% (expected_num_type_2, record_type_str_2, record_type_str_2, record_type_str_1))
try:
self.delete_record(self.custom_zone, "testrecord", record_type_str_2, record2)
except AssertionError as e:
pass
self.delete_record(self.custom_zone, "testrecord", record_type_str_1, record1)
self.assertTrue(num_failures == 0, "Failed collision and replacement behavior. Total failures: %d." % num_failures)
# Windows fails this test in the same way we do.
def _test_cname(self):
"""
Test some special properties of CNAME records.
"""
# RFC 1912: When there is a CNAME record, there must not be any other records with the same alias
cname_record = self.good_records["CNAME"][1]
self.add_record(self.custom_zone, "testrecord", "CNAME", cname_record)
for record_type_str in self.good_records:
other_record = self.good_records[record_type_str][0]
self.add_record(self.custom_zone, "testrecord", record_type_str, other_record, assertion=False)
self.assert_num_records(self.custom_zone, "testrecord", record_type_str, expected_num=0)
# RFC 2181: MX & NS records must not be allowed to point to a CNAME alias
mx_record = "testrecord 1"
ns_record = "testrecord"
self.add_record(self.custom_zone, "mxrec", "MX", mx_record, assertion=False)
self.add_record(self.custom_zone, "nsrec", "NS", ns_record, assertion=False)
self.delete_record(self.custom_zone, "testrecord", "CNAME", cname_record)
def test_add_duplicate_value(self):
"""
Make sure that we can't add duplicate values of any type.
"""
for record_type_str in self.good_records:
record = self.good_records[record_type_str][0]
self.add_record(self.custom_zone, "testrecord", record_type_str, record)
self.add_record(self.custom_zone, "testrecord", record_type_str, record, assertion=False)
self.assert_num_records(self.custom_zone, "testrecord", record_type_str)
self.delete_record(self.custom_zone, "testrecord", record_type_str, record)
def test_add_similar_value(self):
"""
Attempt to add values with the same name and type in the same
zone. This should work, and should result in both values
existing (except with some types).
"""
for record_type_str in self.good_records:
for i in range(1, len(self.good_records[record_type_str])):
record1 = self.good_records[record_type_str][i-1]
record2 = self.good_records[record_type_str][i]
if record_type_str == 'CNAME':
continue
# We expect CNAME records to override one another, as
# an alias can only map to one CNAME record.
# Also, on Windows, when the empty string is added and
# another record is added afterwards, the empty string
# will be silently overriden by the new one, so it
# fails this test for the empty string.
expected_num = 1 if record_type_str == 'CNAME' else 2
self.add_record(self.custom_zone, "testrecord", record_type_str, record1)
self.add_record(self.custom_zone, "testrecord", record_type_str, record2)
self.assert_num_records(self.custom_zone, "testrecord", record_type_str, expected_num=expected_num)
self.delete_record(self.custom_zone, "testrecord", record_type_str, record1)
self.delete_record(self.custom_zone, "testrecord", record_type_str, record2)
def assert_record(self, zone, name, record_type_str, expected_record_str,
assertion=True, client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN):
"""
Asserts whether or not the given record with the given type exists in the
given zone.
"""
try:
_, result = self.query_records(zone, name, record_type_str)
except RuntimeError as e:
if assertion:
raise AssertionError("Record '%s' of type '%s' was not present when it should have been."
% (expected_record_str, record_type_str))
else:
return
found = False
for record in result.rec[0].records:
if record.data == expected_record_str:
found = True
break
if found and not assertion:
raise AssertionError("Record '%s' of type '%s' was present when it shouldn't have been." % (expected_record_str, record_type_str))
elif not found and assertion:
raise AssertionError("Record '%s' of type '%s' was not present when it should have been." % (expected_record_str, record_type_str))
def assert_num_records(self, zone, name, record_type_str, expected_num=1,
client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN):
"""
Asserts that there are a given amount of records with the given type in
the given zone.
"""
try:
_, result = self.query_records(zone, name, record_type_str)
num_results = len(result.rec[0].records)
if not num_results == expected_num:
raise AssertionError("There were %d records of type '%s' with the name '%s' when %d were expected."
% (num_results, record_type_str, name, expected_num))
except RuntimeError:
if not expected_num == 0:
raise AssertionError("There were no records of type '%s' with the name '%s' when %d were expected."
% (record_type_str, name, expected_num))
def query_records(self, zone, name, record_type_str, client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN):
return self.conn.DnssrvEnumRecords2(client_version,
0,
self.server,
zone,
name,
None,
self.record_type_int(record_type_str),
dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA | dnsserver.DNS_RPC_VIEW_NO_CHILDREN,
None,
None)
def record_obj_from_str(self, record_type_str, record_str):
if record_type_str == 'A':
return ARecord(record_str)
elif record_type_str == 'AAAA':
return AAAARecord(record_str)
elif record_type_str == 'PTR':
return PTRRecord(record_str)
elif record_type_str == 'CNAME':
return CNameRecord(record_str)
elif record_type_str == 'NS':
return NSRecord(record_str)
elif record_type_str == 'MX':
split = record_str.split(' ')
return MXRecord(split[0], int(split[1]))
elif record_type_str == 'SRV':
split = record_str.split(' ')
target = split[0]
port = int(split[1])
priority = int(split[2])
weight = int(split[3])
return SRVRecord(target, port, priority, weight)
elif record_type_str == 'TXT':
return TXTRecord(record_str)
def record_type_int(self, record_type_str):
if record_type_str == 'A':
return dnsp.DNS_TYPE_A
elif record_type_str == 'AAAA':
return dnsp.DNS_TYPE_AAAA
elif record_type_str == 'PTR':
return dnsp.DNS_TYPE_PTR
elif record_type_str == 'CNAME':
return dnsp.DNS_TYPE_CNAME
elif record_type_str == 'NS':
return dnsp.DNS_TYPE_NS
elif record_type_str == 'MX':
return dnsp.DNS_TYPE_MX
elif record_type_str == 'SRV':
return dnsp.DNS_TYPE_SRV
elif record_type_str == 'TXT':
return dnsp.DNS_TYPE_TXT
def add_record(self, zone, name, record_type_str, record_str,
assertion=True, client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN):
"""
Attempts to add a map from the given name to a record of the given type,
in the given zone.
Also asserts whether or not the add was successful.
This can also update existing records if they have the same name.
"""
record = self.record_obj_from_str(record_type_str, record_str)
add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
add_rec_buf.rec = record
try:
self.conn.DnssrvUpdateRecord2(client_version,
0,
self.server,
zone,
name,
add_rec_buf,
None)
if not assertion:
raise AssertionError("Successfully added record '%s' of type '%s', which should have failed."
% (record_str, record_type_str))
except RuntimeError as e:
if assertion:
raise AssertionError("Failed to add record '%s' of type '%s', which should have succeeded. Error was '%s'."
% (record_str, record_type_str, str(e)))
def delete_record(self, zone, name, record_type_str, record_str,
assertion=True, client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN):
"""
Attempts to delete a record with the given name, record and record type
from the given zone.
Also asserts whether or not the deletion was successful.
"""
record = self.record_obj_from_str(record_type_str, record_str)
del_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
del_rec_buf.rec = record
try:
self.conn.DnssrvUpdateRecord2(client_version,
0,
self.server,
zone,
name,
None,
del_rec_buf)
if not assertion:
raise AssertionError("Successfully deleted record '%s' of type '%s', which should have failed." % (record_str, record_type_str))
except RuntimeError as e:
if assertion:
raise AssertionError("Failed to delete record '%s' of type '%s', which should have succeeded. Error was '%s'." % (record_str, record_type_str, str(e)))
def test_query2(self):
typeid, result = self.conn.DnssrvQuery2(dnsserver.DNS_CLIENT_VERSION_W2K,
0,
@ -76,13 +698,13 @@ class DnsserverTests(RpcInterfaceTestCase):
request_filter = (dnsserver.DNS_ZONE_REQUEST_REVERSE |
dnsserver.DNS_ZONE_REQUEST_PRIMARY)
typeid, zones = self.conn.DnssrvComplexOperation2(client_version,
0,
self.server,
None,
'EnumZones',
dnsserver.DNSSRV_TYPEID_DWORD,
request_filter)
_, zones = self.conn.DnssrvComplexOperation2(client_version,
0,
self.server,
None,
'EnumZones',
dnsserver.DNSSRV_TYPEID_DWORD,
request_filter)
self.assertEquals(1, zones.dwZoneCount)
# Delete zone
@ -109,6 +731,7 @@ class DnsserverTests(RpcInterfaceTestCase):
client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
request_filter = (dnsserver.DNS_ZONE_REQUEST_FORWARD |
dnsserver.DNS_ZONE_REQUEST_PRIMARY)
typeid, zones = self.conn.DnssrvComplexOperation2(client_version,
0,
self.server,
@ -117,7 +740,7 @@ class DnsserverTests(RpcInterfaceTestCase):
dnsserver.DNSSRV_TYPEID_DWORD,
request_filter)
self.assertEquals(dnsserver.DNSSRV_TYPEID_ZONE_LIST, typeid)
self.assertEquals(2, zones.dwZoneCount)
self.assertEquals(3, zones.dwZoneCount)
request_filter = (dnsserver.DNS_ZONE_REQUEST_REVERSE |
dnsserver.DNS_ZONE_REQUEST_PRIMARY)
@ -131,25 +754,23 @@ class DnsserverTests(RpcInterfaceTestCase):
self.assertEquals(dnsserver.DNSSRV_TYPEID_ZONE_LIST, typeid)
self.assertEquals(0, zones.dwZoneCount)
def test_enumrecords2(self):
client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
record_type = dnsp.DNS_TYPE_NS
select_flags = (dnsserver.DNS_RPC_VIEW_ROOT_HINT_DATA |
dnsserver.DNS_RPC_VIEW_ADDITIONAL_DATA)
buflen, roothints = self.conn.DnssrvEnumRecords2(client_version,
0,
self.server,
'..RootHints',
'.',
None,
record_type,
select_flags,
None,
None)
_, roothints = self.conn.DnssrvEnumRecords2(client_version,
0,
self.server,
'..RootHints',
'.',
None,
record_type,
select_flags,
None,
None)
self.assertEquals(14, roothints.count) # 1 NS + 13 A records (a-m)
def test_updaterecords2(self):
client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
record_type = dnsp.DNS_TYPE_A
@ -170,16 +791,16 @@ class DnsserverTests(RpcInterfaceTestCase):
add_rec_buf,
None)
buflen, result = self.conn.DnssrvEnumRecords2(client_version,
0,
self.server,
self.zone,
name,
None,
record_type,
select_flags,
None,
None)
_, result = self.conn.DnssrvEnumRecords2(client_version,
0,
self.server,
self.zone,
name,
None,
record_type,
select_flags,
None,
None)
self.assertEquals(1, result.count)
self.assertEquals(1, result.rec[0].wRecordCount)
self.assertEquals(dnsp.DNS_TYPE_A, result.rec[0].records[0].wType)
@ -235,42 +856,3 @@ class DnsserverTests(RpcInterfaceTestCase):
select_flags,
None,
None)
def test_updaterecords2_soa(self):
client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
record_type = dnsp.DNS_TYPE_NS
select_flags = (dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA |
dnsserver.DNS_RPC_VIEW_NO_CHILDREN)
nameserver = 'ns.example.local'
rec = NSRecord(nameserver)
# Add record
add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
add_rec_buf.rec = rec
self.conn.DnssrvUpdateRecord2(client_version,
0,
self.server,
self.zone,
'.',
add_rec_buf,
None)
buflen, result = self.conn.DnssrvEnumRecords2(client_version,
0,
self.server,
self.zone,
'@',
None,
record_type,
select_flags,
None,
None)
self.assertEquals(1, result.count)
self.assertEquals(2, result.rec[0].wRecordCount)
match = False
for i in range(2):
self.assertEquals(dnsp.DNS_TYPE_NS, result.rec[0].records[i].wType)
if result.rec[0].records[i].data.str.rstrip('.') == nameserver:
match = True
self.assertEquals(match, True)

View File

@ -310,3 +310,6 @@
^samba.tests.samba_tool.dnscmd.samba.tests.samba_tool.dnscmd.DnsCmdTestCase.test_rank_none
^samba.tests.samba_tool.dnscmd.samba.tests.samba_tool.dnscmd.DnsCmdTestCase.test_reject_invalid_commands
^samba.tests.samba_tool.dnscmd.samba.tests.samba_tool.dnscmd.DnsCmdTestCase.test_update_valid_type
^samba.tests.dcerpc.dnsserver.samba.tests.dcerpc.dnsserver.DnsserverTests.test_add_duplicate_different_type.*
^samba.tests.dcerpc.dnsserver.samba.tests.dcerpc.dnsserver.DnsserverTests.test_rank_none.*
^samba.tests.dcerpc.dnsserver.samba.tests.dcerpc.dnsserver.DnsserverTests.test_reject_invalid_commands