mirror of
https://github.com/samba-team/samba.git
synced 2025-01-25 06:04:04 +03:00
5e559528b3
It worked accidentally, like all our tombstone tests. Signed-off-by: Douglas Bagnall <douglas.bagnall@catalyst.net.nz> Reviewed-by: Andrew Bartlett <abartlet@samba.org>
1313 lines
65 KiB
Python
1313 lines
65 KiB
Python
# Unix SMB/CIFS implementation.
|
|
# Copyright (C) Amitay Isaacs <amitay@gmail.com> 2011
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
#
|
|
|
|
"""Tests for samba.dcerpc.dnsserver"""
|
|
|
|
import os
|
|
import ldb
|
|
|
|
from samba.auth import system_session
|
|
from samba.samdb import SamDB
|
|
from samba.ndr import ndr_unpack, ndr_pack
|
|
from samba.dcerpc import dnsp, dnsserver, security
|
|
from samba.tests import RpcInterfaceTestCase, env_get_var_value
|
|
from samba.dnsserver import record_from_string, flag_from_string, ARecord
|
|
from samba import sd_utils, descriptor
|
|
from samba import WERRORError, werror
|
|
|
|
|
|
class DnsserverTests(RpcInterfaceTestCase):
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
good_dns = ["SAMDOM.EXAMPLE.COM",
|
|
"1.EXAMPLE.COM",
|
|
"%sEXAMPLE.COM" % ("1." * 100),
|
|
"EXAMPLE",
|
|
"\n.COM",
|
|
"!@#$%^&*()_",
|
|
"HIGH\xFFBYTE",
|
|
"@.EXAMPLE.COM",
|
|
"."]
|
|
bad_dns = ["...",
|
|
".EXAMPLE.COM",
|
|
".EXAMPLE.",
|
|
"",
|
|
"SAMDOM..EXAMPLE.COM"]
|
|
|
|
good_mx = ["SAMDOM.EXAMPLE.COM 65535"]
|
|
bad_mx = []
|
|
|
|
good_srv = ["SAMDOM.EXAMPLE.COM 65535 65535 65535"]
|
|
bad_srv = []
|
|
|
|
for bad_dn in bad_dns:
|
|
bad_mx.append("%s 1" % bad_dn)
|
|
bad_srv.append("%s 0 0 0" % bad_dn)
|
|
for good_dn in good_dns:
|
|
good_mx.append("%s 1" % good_dn)
|
|
good_srv.append("%s 0 0 0" % good_dn)
|
|
|
|
cls.good_records = {
|
|
"A": ["192.168.0.1",
|
|
"255.255.255.255"],
|
|
"AAAA": ["1234:5678:9ABC:DEF0:0000:0000:0000:0000",
|
|
"0000:0000:0000:0000:0000:0000:0000:0000",
|
|
"1234:5678:9ABC:DEF0:1234:5678:9ABC:DEF0",
|
|
"1234:1234:1234::",
|
|
"1234:1234:1234:1234:1234::",
|
|
"1234:5678:9ABC:DEF0::",
|
|
"0000:0000::0000",
|
|
"1234::5678:9ABC:0000:0000:0000:0000",
|
|
"::1",
|
|
"::",
|
|
"1:1:1:1:1:1:1:1"],
|
|
"PTR": good_dns,
|
|
"CNAME": good_dns,
|
|
"NS": good_dns,
|
|
"MX": good_mx,
|
|
"SRV": good_srv,
|
|
"TXT": ["text", "", "@#!", "\n"]
|
|
}
|
|
|
|
cls.bad_records = {
|
|
"A": ["192.168.0.500",
|
|
"255.255.255.255/32"],
|
|
"AAAA": ["GGGG:1234:5678:9ABC:0000:0000:0000:0000",
|
|
"0000:0000:0000:0000:0000:0000:0000:0000/1",
|
|
"AAAA:AAAA:AAAA:AAAA:G000:0000:0000:1234",
|
|
"1234:5678:9ABC:DEF0:1234:5678:9ABC:DEF0:1234",
|
|
"1234:5678:9ABC:DEF0:1234:5678:9ABC",
|
|
"1111::1111::1111"],
|
|
"PTR": bad_dns,
|
|
"CNAME": bad_dns,
|
|
"NS": bad_dns,
|
|
"MX": bad_mx,
|
|
"SRV": bad_srv
|
|
}
|
|
|
|
# Because we use uint16_t for these numbers, we can't
|
|
# actually create these records.
|
|
invalid_mx = ["SAMDOM.EXAMPLE.COM -1",
|
|
"SAMDOM.EXAMPLE.COM 65536",
|
|
"%s 1" % ("A" * 256)]
|
|
invalid_srv = ["SAMDOM.EXAMPLE.COM 0 65536 0",
|
|
"SAMDOM.EXAMPLE.COM 0 0 65536",
|
|
"SAMDOM.EXAMPLE.COM 65536 0 0"]
|
|
cls.invalid_records = {
|
|
"MX": invalid_mx,
|
|
"SRV": invalid_srv
|
|
}
|
|
|
|
def setUp(self):
|
|
super(DnsserverTests, self).setUp()
|
|
self.server = os.environ["DC_SERVER"]
|
|
self.zone = env_get_var_value("REALM").lower()
|
|
self.conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server),
|
|
self.get_loadparm(),
|
|
self.get_credentials())
|
|
|
|
self.samdb = SamDB(url="ldap://%s" % os.environ["DC_SERVER_IP"],
|
|
lp=self.get_loadparm(),
|
|
session_info=system_session(),
|
|
credentials=self.get_credentials())
|
|
|
|
self.custom_zone = "zone"
|
|
zone_create_info = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
|
|
zone_create_info.pszZoneName = self.custom_zone
|
|
zone_create_info.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
|
|
zone_create_info.fAging = 0
|
|
zone_create_info.fDsIntegrated = 1
|
|
zone_create_info.fLoadExisting = 1
|
|
zone_create_info.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
|
|
|
|
self.conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
|
|
0,
|
|
self.server,
|
|
None,
|
|
0,
|
|
'ZoneCreate',
|
|
dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
|
|
zone_create_info)
|
|
|
|
def tearDown(self):
|
|
self.conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
|
|
0,
|
|
self.server,
|
|
self.custom_zone,
|
|
0,
|
|
'DeleteZoneFromDs',
|
|
dnsserver.DNSSRV_TYPEID_NULL,
|
|
None)
|
|
super(DnsserverTests, self).tearDown()
|
|
|
|
def test_enum_is_sorted(self):
|
|
"""
|
|
Confirm the zone is sorted
|
|
"""
|
|
|
|
record_str = "192.168.50.50"
|
|
record_type_str = "A"
|
|
self.add_record(self.custom_zone, "atestrecord-1", record_type_str, record_str)
|
|
self.add_record(self.custom_zone, "atestrecord-2", record_type_str, record_str)
|
|
self.add_record(self.custom_zone, "atestrecord-3", record_type_str, record_str)
|
|
self.add_record(self.custom_zone, "atestrecord-4", record_type_str, record_str)
|
|
self.add_record(self.custom_zone, "atestrecord-0", record_type_str, record_str)
|
|
|
|
# This becomes an extra A on the zone itself by server-side magic
|
|
self.add_record(self.custom_zone, self.custom_zone, record_type_str, record_str)
|
|
|
|
_, result = self.conn.DnssrvEnumRecords2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
|
|
0,
|
|
self.server,
|
|
self.custom_zone,
|
|
"@",
|
|
None,
|
|
flag_from_string(record_type_str),
|
|
dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA,
|
|
None,
|
|
None)
|
|
|
|
self.assertEqual(len(result.rec), 6)
|
|
self.assertEqual(result.rec[0].dnsNodeName.str, "")
|
|
self.assertEqual(result.rec[1].dnsNodeName.str, "atestrecord-0")
|
|
self.assertEqual(result.rec[2].dnsNodeName.str, "atestrecord-1")
|
|
self.assertEqual(result.rec[3].dnsNodeName.str, "atestrecord-2")
|
|
self.assertEqual(result.rec[4].dnsNodeName.str, "atestrecord-3")
|
|
self.assertEqual(result.rec[5].dnsNodeName.str, "atestrecord-4")
|
|
|
|
def test_enum_is_sorted_with_zone_dup(self):
|
|
"""
|
|
Confirm the zone is sorted
|
|
"""
|
|
|
|
record_str = "192.168.50.50"
|
|
record_type_str = "A"
|
|
self.add_record(self.custom_zone, "atestrecord-1", record_type_str, record_str)
|
|
self.add_record(self.custom_zone, "atestrecord-2", record_type_str, record_str)
|
|
self.add_record(self.custom_zone, "atestrecord-3", record_type_str, record_str)
|
|
self.add_record(self.custom_zone, "atestrecord-4", record_type_str, record_str)
|
|
self.add_record(self.custom_zone, "atestrecord-0", record_type_str, record_str)
|
|
|
|
# This triggers a bug in old Samba
|
|
self.add_record(self.custom_zone, self.custom_zone + "1", record_type_str, record_str)
|
|
|
|
dn, record = self.get_record_from_db(self.custom_zone, self.custom_zone + "1")
|
|
|
|
new_dn = ldb.Dn(self.samdb, str(dn))
|
|
new_dn.set_component(0, "dc", self.custom_zone)
|
|
self.samdb.rename(dn, new_dn)
|
|
|
|
_, result = self.conn.DnssrvEnumRecords2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
|
|
0,
|
|
self.server,
|
|
self.custom_zone,
|
|
"@",
|
|
None,
|
|
flag_from_string(record_type_str),
|
|
dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA,
|
|
None,
|
|
None)
|
|
|
|
self.assertEqual(len(result.rec), 7)
|
|
self.assertEqual(result.rec[0].dnsNodeName.str, "")
|
|
self.assertEqual(result.rec[1].dnsNodeName.str, "atestrecord-0")
|
|
self.assertEqual(result.rec[2].dnsNodeName.str, "atestrecord-1")
|
|
self.assertEqual(result.rec[3].dnsNodeName.str, "atestrecord-2")
|
|
self.assertEqual(result.rec[4].dnsNodeName.str, "atestrecord-3")
|
|
self.assertEqual(result.rec[5].dnsNodeName.str, "atestrecord-4")
|
|
|
|
# Windows doesn't reload the zone fast enough, but doesn't
|
|
# have the bug anyway, it will sort last on both names (where
|
|
# it should)
|
|
if result.rec[6].dnsNodeName.str != (self.custom_zone + "1"):
|
|
self.assertEqual(result.rec[6].dnsNodeName.str, self.custom_zone)
|
|
|
|
def test_enum_is_sorted_children_prefix_first(self):
|
|
"""
|
|
Confirm the zone returns the selected prefix first but no more
|
|
as Samba is flappy for the full sort
|
|
"""
|
|
|
|
record_str = "192.168.50.50"
|
|
record_type_str = "A"
|
|
self.add_record(self.custom_zone, "atestrecord-1.a.b", record_type_str, record_str)
|
|
self.add_record(self.custom_zone, "atestrecord-2.a.b", record_type_str, record_str)
|
|
self.add_record(self.custom_zone, "atestrecord-3.a.b", record_type_str, record_str)
|
|
self.add_record(self.custom_zone, "atestrecord-4.a.b", record_type_str, record_str)
|
|
self.add_record(self.custom_zone, "atestrecord-0.a.b", record_type_str, record_str)
|
|
|
|
# Not expected to be returned
|
|
self.add_record(self.custom_zone, "atestrecord-0.b.b", record_type_str, record_str)
|
|
|
|
_, result = self.conn.DnssrvEnumRecords2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
|
|
0,
|
|
self.server,
|
|
self.custom_zone,
|
|
"a.b",
|
|
None,
|
|
flag_from_string(record_type_str),
|
|
dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA,
|
|
None,
|
|
None)
|
|
|
|
self.assertEqual(len(result.rec), 6)
|
|
self.assertEqual(result.rec[0].dnsNodeName.str, "")
|
|
|
|
def test_enum_is_sorted_children(self):
|
|
"""
|
|
Confirm the zone is sorted
|
|
"""
|
|
|
|
record_str = "192.168.50.50"
|
|
record_type_str = "A"
|
|
self.add_record(self.custom_zone, "atestrecord-1.a.b", record_type_str, record_str)
|
|
self.add_record(self.custom_zone, "atestrecord-2.a.b", record_type_str, record_str)
|
|
self.add_record(self.custom_zone, "atestrecord-3.a.b", record_type_str, record_str)
|
|
self.add_record(self.custom_zone, "atestrecord-4.a.b", record_type_str, record_str)
|
|
self.add_record(self.custom_zone, "atestrecord-0.a.b", record_type_str, record_str)
|
|
|
|
# Not expected to be returned
|
|
self.add_record(self.custom_zone, "atestrecord-0.b.b", record_type_str, record_str)
|
|
|
|
_, result = self.conn.DnssrvEnumRecords2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
|
|
0,
|
|
self.server,
|
|
self.custom_zone,
|
|
"a.b",
|
|
None,
|
|
flag_from_string(record_type_str),
|
|
dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA,
|
|
None,
|
|
None)
|
|
|
|
self.assertEqual(len(result.rec), 6)
|
|
self.assertEqual(result.rec[0].dnsNodeName.str, "")
|
|
self.assertEqual(result.rec[1].dnsNodeName.str, "atestrecord-0")
|
|
self.assertEqual(result.rec[2].dnsNodeName.str, "atestrecord-1")
|
|
self.assertEqual(result.rec[3].dnsNodeName.str, "atestrecord-2")
|
|
self.assertEqual(result.rec[4].dnsNodeName.str, "atestrecord-3")
|
|
self.assertEqual(result.rec[5].dnsNodeName.str, "atestrecord-4")
|
|
|
|
# This test fails against Samba (but passes against Windows),
|
|
# because Samba does not return the record when we enum records.
|
|
# Records can be given DNS_RANK_NONE when the zone they are in
|
|
# does not have DNS_ZONE_TYPE_PRIMARY. Since such records can be
|
|
# deleted, however, we do not consider this urgent to fix and
|
|
# so this test is a knownfail.
|
|
def test_rank_none(self):
|
|
"""
|
|
See what happens when we set a record's rank to
|
|
DNS_RANK_NONE.
|
|
"""
|
|
|
|
record_str = "192.168.50.50"
|
|
record_type_str = "A"
|
|
self.add_record(self.custom_zone, "testrecord", record_type_str, record_str)
|
|
|
|
dn, record = self.get_record_from_db(self.custom_zone, "testrecord")
|
|
record.rank = 0 # DNS_RANK_NONE
|
|
res = self.samdb.dns_replace_by_dn(dn, [record])
|
|
if res is not None:
|
|
self.fail("Unable to update dns record to have DNS_RANK_NONE.")
|
|
|
|
self.assert_num_records(self.custom_zone, "testrecord", record_type_str)
|
|
self.add_record(self.custom_zone, "testrecord", record_type_str, record_str, assertion=False)
|
|
self.delete_record(self.custom_zone, "testrecord", record_type_str, record_str)
|
|
self.assert_num_records(self.custom_zone, "testrecord", record_type_str, 0)
|
|
|
|
def test_dns_tombstoned_zero_timestamp(self):
|
|
"""What happens with a zero EntombedTime tombstone?"""
|
|
# A zero-timestamp tombstone record has a special meaning for
|
|
# dns_common_replace(), which is the function exposed by
|
|
# samdb.dns_replace_by_dn(), and which is *NOT* a general
|
|
# purpose record replacement function but a specialised part
|
|
# of the dns update mechanism (for both DLZ and internal).
|
|
#
|
|
# In the earlier stages of handling updates, a record that
|
|
# needs to be deleted is set to be a tombstone with a zero
|
|
# timestamp. dns_common_replace() notices this specific
|
|
# marker, and if there are no other records, marks the node as
|
|
# tombstoned, in the process adding a "real" tombstone.
|
|
#
|
|
# If the tombstone has a non-zero timestamp, as you'll see in
|
|
# the next test, dns_common_replace will decide that the node
|
|
# is already tombstoned, and that no action needs to be taken.
|
|
#
|
|
# This test has worked historically, entirely by accident, as
|
|
# changing the wType appears to
|
|
|
|
record_str = "192.168.50.50"
|
|
self.add_record(self.custom_zone, "testrecord", 'A', record_str)
|
|
|
|
dn, record = self.get_record_from_db(self.custom_zone, "testrecord")
|
|
record.wType = dnsp.DNS_TYPE_TOMBSTONE
|
|
record.data = 0
|
|
self.samdb.dns_replace_by_dn(dn, [record])
|
|
|
|
# there should be no A record, and one TOMBSTONE record.
|
|
self.assert_num_records(self.custom_zone, "testrecord", 'A', 0)
|
|
# we can't make assertions about the tombstone count based on
|
|
# RPC calls, as ther are no tombstones in RPCs (there is
|
|
# "DNS_TYPE_ZERO" instead). Nor do tombstones show up if we
|
|
# use DNS_TYPE_ALL.
|
|
self.assert_num_records(self.custom_zone, "testrecord", 'ALL', 0)
|
|
|
|
# But we can use LDAP:
|
|
records = self.ldap_get_records(self.custom_zone, "testrecord")
|
|
self.assertEqual(len(records), 1)
|
|
r = records[0]
|
|
self.assertEqual(r.wType, dnsp.DNS_TYPE_TOMBSTONE)
|
|
self.assertGreater(r.data, 1e17) # ~ October 1916
|
|
|
|
# this should fail, because no A records.
|
|
self.delete_record(self.custom_zone, "testrecord", 'A', record_str,
|
|
assertion=False)
|
|
|
|
def test_dns_tombstoned_nonzero_timestamp(self):
|
|
"""See what happens when we set a record to be tombstoned with an
|
|
EntombedTime timestamp.
|
|
"""
|
|
# Because this tombstone has a non-zero EntombedTime,
|
|
# dns_common_replace() will decide the node was already
|
|
# tombstoned and there is nothing to be done, leaving the A
|
|
# record where it was.
|
|
|
|
record_str = "192.168.50.50"
|
|
self.add_record(self.custom_zone, "testrecord", 'A', record_str)
|
|
|
|
dn, record = self.get_record_from_db(self.custom_zone, "testrecord")
|
|
record.wType = dnsp.DNS_TYPE_TOMBSTONE
|
|
record.data = 0x123456789A
|
|
self.samdb.dns_replace_by_dn(dn, [record])
|
|
|
|
# there should be the A record and no TOMBSTONE
|
|
self.assert_num_records(self.custom_zone, "testrecord", 'A', 1)
|
|
self.assert_num_records(self.custom_zone, "testrecord", 'TOMBSTONE', 0)
|
|
# this should succeed
|
|
self.delete_record(self.custom_zone, "testrecord", 'A', record_str,
|
|
assertion=True)
|
|
self.assert_num_records(self.custom_zone, "testrecord", 'TOMBSTONE', 0)
|
|
self.assert_num_records(self.custom_zone, "testrecord", 'A', 0)
|
|
|
|
def get_record_from_db(self, zone_name, record_name):
|
|
"""
|
|
Returns (dn of record, record)
|
|
"""
|
|
|
|
zones = self.samdb.search(base="DC=DomainDnsZones,%s" % self.samdb.get_default_basedn(), scope=ldb.SCOPE_SUBTREE,
|
|
expression="(objectClass=dnsZone)",
|
|
attrs=["cn"])
|
|
|
|
zone_dn = None
|
|
for zone in zones:
|
|
if "DC=%s," % zone_name in str(zone.dn):
|
|
zone_dn = zone.dn
|
|
break
|
|
|
|
if zone_dn is None:
|
|
raise AssertionError("Couldn't find zone '%s'." % zone_name)
|
|
|
|
records = self.samdb.search(base=zone_dn, scope=ldb.SCOPE_SUBTREE,
|
|
expression="(objectClass=dnsNode)",
|
|
attrs=["dnsRecord"])
|
|
|
|
for old_packed_record in records:
|
|
if record_name in str(old_packed_record.dn):
|
|
rec = ndr_unpack(dnsp.DnssrvRpcRecord, old_packed_record["dnsRecord"][0])
|
|
return (old_packed_record.dn, rec)
|
|
|
|
def ldap_get_records(self, zone, name):
|
|
zone_dn = (f"DC={zone},CN=MicrosoftDNS,DC=DomainDNSZones,"
|
|
f"{self.samdb.get_default_basedn()}")
|
|
|
|
expr = f"(&(objectClass=dnsNode)(name={name}))"
|
|
nodes = self.samdb.search(base=zone_dn,
|
|
scope=ldb.SCOPE_SUBTREE,
|
|
expression=expr,
|
|
attrs=["dnsRecord"])
|
|
|
|
records = nodes[0].get('dnsRecord')
|
|
return [ndr_unpack(dnsp.DnssrvRpcRecord, r) for r in records]
|
|
|
|
def test_duplicate_matching(self):
|
|
"""
|
|
Make sure that records which should be distinct from each other or duplicate
|
|
to each other behave as expected.
|
|
"""
|
|
|
|
distinct_dns = [("SAMDOM.EXAMPLE.COM",
|
|
"SAMDOM.EXAMPLE.CO",
|
|
"EXAMPLE.COM", "SAMDOM.EXAMPLE")]
|
|
duplicate_dns = [("SAMDOM.EXAMPLE.COM", "samdom.example.com", "SAMDOM.example.COM"),
|
|
("EXAMPLE.", "EXAMPLE")]
|
|
|
|
# Every tuple has entries which should be considered duplicate to one another.
|
|
duplicates = {
|
|
"AAAA": [("AAAA::", "aaaa::"),
|
|
("AAAA::", "AAAA:0000::"),
|
|
("AAAA::", "AAAA:0000:0000:0000:0000:0000:0000:0000"),
|
|
("AAAA::", "AAAA:0:0:0:0:0:0:0"),
|
|
("0123::", "123::"),
|
|
("::", "::0", "0000:0000:0000:0000:0000:0000:0000:0000")],
|
|
}
|
|
|
|
# Every tuple has entries which should be considered distinct from one another.
|
|
distinct = {
|
|
"A": [("192.168.1.0", "192.168.1.1", "192.168.2.0", "192.169.1.0", "193.168.1.0")],
|
|
"AAAA": [("AAAA::1234:5678:9ABC", "::AAAA:1234:5678:9ABC"),
|
|
("1000::", "::1000"),
|
|
("::1", "::11", "::1111"),
|
|
("1234::", "0234::")],
|
|
"SRV": [("SAMDOM.EXAMPLE.COM 1 1 1", "SAMDOM.EXAMPLE.COM 1 1 0", "SAMDOM.EXAMPLE.COM 1 0 1",
|
|
"SAMDOM.EXAMPLE.COM 0 1 1", "SAMDOM.EXAMPLE.COM 2 1 0", "SAMDOM.EXAMPLE.COM 2 2 2")],
|
|
"MX": [("SAMDOM.EXAMPLE.COM 1", "SAMDOM.EXAMPLE.COM 0")],
|
|
"TXT": [("A RECORD", "B RECORD", "a record")]
|
|
}
|
|
|
|
for record_type_str in ("PTR", "CNAME", "NS"):
|
|
distinct[record_type_str] = distinct_dns
|
|
duplicates[record_type_str] = duplicate_dns
|
|
|
|
for record_type_str in duplicates:
|
|
for duplicate_tuple in duplicates[record_type_str]:
|
|
# Attempt to add duplicates and make sure that all after the first fails
|
|
self.add_record(self.custom_zone, "testrecord", record_type_str, duplicate_tuple[0])
|
|
for record in duplicate_tuple:
|
|
self.add_record(self.custom_zone, "testrecord", record_type_str, record, assertion=False)
|
|
self.assert_num_records(self.custom_zone, "testrecord", record_type_str)
|
|
self.delete_record(self.custom_zone, "testrecord", record_type_str, duplicate_tuple[0])
|
|
|
|
# Repeatedly: add the first duplicate, and attempt to remove all of the others, making sure this succeeds
|
|
for record in duplicate_tuple:
|
|
self.add_record(self.custom_zone, "testrecord", record_type_str, duplicate_tuple[0])
|
|
self.delete_record(self.custom_zone, "testrecord", record_type_str, record)
|
|
|
|
for record_type_str in distinct:
|
|
for distinct_tuple in distinct[record_type_str]:
|
|
# Attempt to add distinct and make sure that they all succeed within a tuple
|
|
i = 0
|
|
for record in distinct_tuple:
|
|
i = i + 1
|
|
try:
|
|
self.add_record(self.custom_zone, "testrecord", record_type_str, record)
|
|
# All records should have been added.
|
|
self.assert_num_records(self.custom_zone, "testrecord", record_type_str, expected_num=i)
|
|
except AssertionError as e:
|
|
raise AssertionError("Failed to add %s, which should be distinct from all others in the set. "
|
|
"Original error: %s\nDistinct set: %s." % (record, e, distinct_tuple))
|
|
for record in distinct_tuple:
|
|
self.delete_record(self.custom_zone, "testrecord", record_type_str, record)
|
|
# CNAMEs should not have been added, since they conflict.
|
|
if record_type_str == 'CNAME':
|
|
continue
|
|
|
|
# Add the first distinct and attempt to remove all of the others, making sure this fails
|
|
# Windows fails this test. This is probably due to weird tombstoning behavior.
|
|
self.add_record(self.custom_zone, "testrecord", record_type_str, distinct_tuple[0])
|
|
for record in distinct_tuple:
|
|
if record == distinct_tuple[0]:
|
|
continue
|
|
try:
|
|
self.delete_record(self.custom_zone, "testrecord", record_type_str, record, assertion=False)
|
|
except AssertionError as e:
|
|
raise AssertionError("Managed to remove %s by attempting to remove %s. Original error: %s"
|
|
% (distinct_tuple[0], record, e))
|
|
self.delete_record(self.custom_zone, "testrecord", record_type_str, distinct_tuple[0])
|
|
|
|
def test_accept_valid_commands(self):
|
|
"""
|
|
Make sure that we can add, update and delete a variety
|
|
of valid records.
|
|
"""
|
|
for record_type_str in self.good_records:
|
|
for record_str in self.good_records[record_type_str]:
|
|
self.add_record(self.custom_zone, "testrecord", record_type_str, record_str)
|
|
self.assert_num_records(self.custom_zone, "testrecord", record_type_str)
|
|
self.delete_record(self.custom_zone, "testrecord", record_type_str, record_str)
|
|
|
|
def check_params(self, wDataLength, rank, flags, dwTtlSeconds, dwReserved, data,
|
|
wType, dwTimeStamp=0, zone="zone", rec_name="testrecord"):
|
|
res = self.get_record_from_db(zone, rec_name)
|
|
self.assertIsNotNone(res, "Expected record %s but was not found over LDAP." % data)
|
|
(rec_dn, rec) = res
|
|
self.assertEqual(wDataLength, rec.wDataLength, "Unexpected data length for record %s. Got %s, expected %s." % (data, rec.wDataLength, wDataLength))
|
|
self.assertEqual(rank, rec.rank, "Unexpected rank for record %s. Got %s, expected %s." % (data, rec.rank, rank))
|
|
self.assertEqual(flags, rec.flags, "Unexpected flags for record %s. Got %s, expected %s." % (data, rec.flags, flags))
|
|
self.assertEqual(dwTtlSeconds, rec.dwTtlSeconds, "Unexpected time to live for record %s. Got %s, expected %s." % (data, rec.dwTtlSeconds, dwTtlSeconds))
|
|
self.assertEqual(dwReserved, rec.dwReserved, "Unexpected dwReserved for record %s. Got %s, expected %s." % (data, rec.dwReserved, dwReserved))
|
|
self.assertEqual(data.lower(), rec.data.lower(), "Unexpected data for record %s. Got %s, expected %s." % (data, rec.data.lower(), data.lower()))
|
|
self.assertEqual(wType, rec.wType, "Unexpected wType for record %s. Got %s, expected %s." % (data, rec.wType, wType))
|
|
self.assertEqual(dwTimeStamp, rec.dwTimeStamp, "Unexpected timestamp for record %s. Got %s, expected %s." % (data, rec.dwTimeStamp, dwTimeStamp))
|
|
|
|
def test_record_params(self):
|
|
"""
|
|
Make sure that, when we add records to the database,
|
|
they're added with reasonable parameters.
|
|
"""
|
|
self.add_record(self.custom_zone, "testrecord", "A", "192.168.50.50")
|
|
self.check_params(4, 240, 0, 900, 0, "192.168.50.50", 1)
|
|
self.delete_record(self.custom_zone, "testrecord", "A", "192.168.50.50")
|
|
self.add_record(self.custom_zone, "testrecord", "AAAA", "AAAA:AAAA::")
|
|
self.check_params(16, 240, 0, 900, 0, "AAAA:AAAA:0000:0000:0000:0000:0000:0000", 28)
|
|
self.delete_record(self.custom_zone, "testrecord", "AAAA", "AAAA:AAAA::")
|
|
self.add_record(self.custom_zone, "testrecord", "CNAME", "cnamedest")
|
|
self.check_params(13, 240, 0, 900, 0, "cnamedest", 5)
|
|
self.delete_record(self.custom_zone, "testrecord", "CNAME", "cnamedest")
|
|
|
|
def test_reject_invalid_commands(self):
|
|
"""
|
|
Make sure that we can't add a variety of invalid records,
|
|
and that we can't update valid records to invalid ones.
|
|
"""
|
|
num_failures = 0
|
|
for record_type_str in self.bad_records:
|
|
for record_str in self.bad_records[record_type_str]:
|
|
# Attempt to add the bad record, which should fail. Then, attempt to query for and delete
|
|
# it. Since it shouldn't exist, these should fail too.
|
|
try:
|
|
self.add_record(self.custom_zone, "testrecord", record_type_str, record_str, assertion=False)
|
|
self.assert_num_records(self.custom_zone, "testrecord", record_type_str, expected_num=0)
|
|
self.delete_record(self.custom_zone, "testrecord", record_type_str, record_str, assertion=False)
|
|
except AssertionError as e:
|
|
print(e)
|
|
num_failures = num_failures + 1
|
|
|
|
# Also try to update valid records to invalid ones, making sure this fails
|
|
for record_type_str in self.bad_records:
|
|
for record_str in self.bad_records[record_type_str]:
|
|
good_record_str = self.good_records[record_type_str][0]
|
|
self.add_record(self.custom_zone, "testrecord", record_type_str, good_record_str)
|
|
try:
|
|
self.add_record(self.custom_zone, "testrecord", record_type_str, record_str, assertion=False)
|
|
except AssertionError as e:
|
|
print(e)
|
|
num_failures = num_failures + 1
|
|
self.delete_record(self.custom_zone, "testrecord", record_type_str, good_record_str)
|
|
|
|
self.assertTrue(num_failures == 0, "Failed to reject invalid commands. Total failures: %d." % num_failures)
|
|
|
|
def test_add_duplicate_different_type(self):
|
|
"""
|
|
Attempt to add some values which have the same name as
|
|
existing ones, just a different type.
|
|
"""
|
|
num_failures = 0
|
|
for record_type_str_1 in self.good_records:
|
|
record1 = self.good_records[record_type_str_1][0]
|
|
self.add_record(self.custom_zone, "testrecord", record_type_str_1, record1)
|
|
for record_type_str_2 in self.good_records:
|
|
if record_type_str_1 == record_type_str_2:
|
|
continue
|
|
|
|
record2 = self.good_records[record_type_str_2][0]
|
|
|
|
has_a = record_type_str_1 == 'A' or record_type_str_2 == 'A'
|
|
has_aaaa = record_type_str_1 == 'AAAA' or record_type_str_2 == 'AAAA'
|
|
has_cname = record_type_str_1 == 'CNAME' or record_type_str_2 == 'CNAME'
|
|
has_ptr = record_type_str_1 == 'PTR' or record_type_str_2 == 'PTR'
|
|
has_mx = record_type_str_1 == 'MX' or record_type_str_2 == 'MX'
|
|
has_srv = record_type_str_1 == 'SRV' or record_type_str_2 == 'SRV'
|
|
has_txt = record_type_str_1 == 'TXT' or record_type_str_2 == 'TXT'
|
|
|
|
# If we attempt to add any record except A or AAAA when we already have an NS record,
|
|
# the add should fail.
|
|
add_error_ok = False
|
|
if record_type_str_1 == 'NS' and not has_a and not has_aaaa:
|
|
add_error_ok = True
|
|
# If we attempt to add a CNAME when an A, PTR or MX record exists, the add should fail.
|
|
if record_type_str_2 == 'CNAME' and (has_ptr or has_mx or has_a or has_aaaa):
|
|
add_error_ok = True
|
|
# If we have a CNAME, adding an A, AAAA, SRV or TXT record should fail.
|
|
# If we have an A, AAAA, SRV or TXT record, adding a CNAME should fail.
|
|
if has_cname and (has_a or has_aaaa or has_srv or has_txt):
|
|
add_error_ok = True
|
|
|
|
try:
|
|
self.add_record(self.custom_zone, "testrecord", record_type_str_2, record2)
|
|
if add_error_ok:
|
|
num_failures = num_failures + 1
|
|
print("Expected error when adding %s while a %s existed."
|
|
% (record_type_str_2, record_type_str_1))
|
|
except AssertionError as e:
|
|
if not add_error_ok:
|
|
num_failures = num_failures + 1
|
|
print("Didn't expect error when adding %s while a %s existed."
|
|
% (record_type_str_2, record_type_str_1))
|
|
|
|
if not add_error_ok:
|
|
# In the "normal" case, we expect the add to work and us to have one of each type of record afterwards.
|
|
expected_num_type_1 = 1
|
|
expected_num_type_2 = 1
|
|
|
|
# If we have an MX record, a PTR record should replace it when added.
|
|
# If we have a PTR record, an MX record should replace it when added.
|
|
if has_ptr and has_mx:
|
|
expected_num_type_1 = 0
|
|
|
|
# If we have a CNAME, SRV or TXT record, a PTR or MX record should replace it when added.
|
|
if (has_cname or has_srv or has_txt) and (record_type_str_2 == 'PTR' or record_type_str_2 == 'MX'):
|
|
expected_num_type_1 = 0
|
|
|
|
if (record_type_str_1 == 'NS' and (has_a or has_aaaa)):
|
|
expected_num_type_2 = 0
|
|
|
|
try:
|
|
self.assert_num_records(self.custom_zone, "testrecord", record_type_str_1, expected_num=expected_num_type_1)
|
|
except AssertionError as e:
|
|
num_failures = num_failures + 1
|
|
print("Expected %s %s records after adding a %s record and a %s record already existed."
|
|
% (expected_num_type_1, record_type_str_1, record_type_str_2, record_type_str_1))
|
|
try:
|
|
self.assert_num_records(self.custom_zone, "testrecord", record_type_str_2, expected_num=expected_num_type_2)
|
|
except AssertionError as e:
|
|
num_failures = num_failures + 1
|
|
print("Expected %s %s records after adding a %s record and a %s record already existed."
|
|
% (expected_num_type_2, record_type_str_2, record_type_str_2, record_type_str_1))
|
|
|
|
try:
|
|
self.delete_record(self.custom_zone, "testrecord", record_type_str_2, record2)
|
|
except AssertionError as e:
|
|
pass
|
|
|
|
self.delete_record(self.custom_zone, "testrecord", record_type_str_1, record1)
|
|
|
|
self.assertTrue(num_failures == 0, "Failed collision and replacement behavior. Total failures: %d." % num_failures)
|
|
|
|
# Windows fails this test in the same way we do.
|
|
def _test_cname(self):
|
|
"""
|
|
Test some special properties of CNAME records.
|
|
"""
|
|
|
|
# RFC 1912: When there is a CNAME record, there must not be any other records with the same alias
|
|
cname_record = self.good_records["CNAME"][1]
|
|
self.add_record(self.custom_zone, "testrecord", "CNAME", cname_record)
|
|
|
|
for record_type_str in self.good_records:
|
|
other_record = self.good_records[record_type_str][0]
|
|
self.add_record(self.custom_zone, "testrecord", record_type_str, other_record, assertion=False)
|
|
self.assert_num_records(self.custom_zone, "testrecord", record_type_str, expected_num=0)
|
|
|
|
# RFC 2181: MX & NS records must not be allowed to point to a CNAME alias
|
|
mx_record = "testrecord 1"
|
|
ns_record = "testrecord"
|
|
|
|
self.add_record(self.custom_zone, "mxrec", "MX", mx_record, assertion=False)
|
|
self.add_record(self.custom_zone, "nsrec", "NS", ns_record, assertion=False)
|
|
|
|
self.delete_record(self.custom_zone, "testrecord", "CNAME", cname_record)
|
|
|
|
def test_add_duplicate_value(self):
|
|
"""
|
|
Make sure that we can't add duplicate values of any type.
|
|
"""
|
|
for record_type_str in self.good_records:
|
|
record = self.good_records[record_type_str][0]
|
|
|
|
self.add_record(self.custom_zone, "testrecord", record_type_str, record)
|
|
self.add_record(self.custom_zone, "testrecord", record_type_str, record, assertion=False)
|
|
self.assert_num_records(self.custom_zone, "testrecord", record_type_str)
|
|
self.delete_record(self.custom_zone, "testrecord", record_type_str, record)
|
|
|
|
def test_add_similar_value(self):
|
|
"""
|
|
Attempt to add values with the same name and type in the same
|
|
zone. This should work, and should result in both values
|
|
existing (except with some types).
|
|
"""
|
|
for record_type_str in self.good_records:
|
|
for i in range(1, len(self.good_records[record_type_str])):
|
|
record1 = self.good_records[record_type_str][i - 1]
|
|
record2 = self.good_records[record_type_str][i]
|
|
|
|
if record_type_str == 'CNAME':
|
|
continue
|
|
# We expect CNAME records to override one another, as
|
|
# an alias can only map to one CNAME record.
|
|
# Also, on Windows, when the empty string is added and
|
|
# another record is added afterwards, the empty string
|
|
# will be silently overridden by the new one, so it
|
|
# fails this test for the empty string.
|
|
expected_num = 1 if record_type_str == 'CNAME' else 2
|
|
|
|
self.add_record(self.custom_zone, "testrecord", record_type_str, record1)
|
|
self.add_record(self.custom_zone, "testrecord", record_type_str, record2)
|
|
self.assert_num_records(self.custom_zone, "testrecord", record_type_str, expected_num=expected_num)
|
|
self.delete_record(self.custom_zone, "testrecord", record_type_str, record1)
|
|
self.delete_record(self.custom_zone, "testrecord", record_type_str, record2)
|
|
|
|
def assert_record(self, zone, name, record_type_str, expected_record_str,
|
|
assertion=True, client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN):
|
|
"""
|
|
Asserts whether or not the given record with the given type exists in the
|
|
given zone.
|
|
"""
|
|
try:
|
|
_, result = self.query_records(zone, name, record_type_str)
|
|
except RuntimeError as e:
|
|
if assertion:
|
|
raise AssertionError("Record '%s' of type '%s' was not present when it should have been."
|
|
% (expected_record_str, record_type_str))
|
|
else:
|
|
return
|
|
|
|
found = False
|
|
for record in result.rec[0].records:
|
|
if record.data == expected_record_str:
|
|
found = True
|
|
break
|
|
|
|
if found and not assertion:
|
|
raise AssertionError("Record '%s' of type '%s' was present when it shouldn't have been." % (expected_record_str, record_type_str))
|
|
elif not found and assertion:
|
|
raise AssertionError("Record '%s' of type '%s' was not present when it should have been." % (expected_record_str, record_type_str))
|
|
|
|
def assert_num_records(self, zone, name, record_type_str, expected_num=1,
|
|
client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN):
|
|
"""
|
|
Asserts that there are a given amount of records with the given type in
|
|
the given zone.
|
|
"""
|
|
try:
|
|
_, result = self.query_records(zone, name, record_type_str)
|
|
num_results = len(result.rec[0].records)
|
|
if not num_results == expected_num:
|
|
raise AssertionError("There were %d records of type '%s' with the name '%s' when %d were expected."
|
|
% (num_results, record_type_str, name, expected_num))
|
|
except RuntimeError:
|
|
if not expected_num == 0:
|
|
raise AssertionError("There were no records of type '%s' with the name '%s' when %d were expected."
|
|
% (record_type_str, name, expected_num))
|
|
|
|
def query_records(self, zone, name, record_type_str, client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN):
|
|
return self.conn.DnssrvEnumRecords2(client_version,
|
|
0,
|
|
self.server,
|
|
zone,
|
|
name,
|
|
None,
|
|
flag_from_string(record_type_str),
|
|
dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA | dnsserver.DNS_RPC_VIEW_NO_CHILDREN,
|
|
None,
|
|
None)
|
|
|
|
def add_record(self, zone, name, record_type_str, record_str,
|
|
assertion=True, client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN):
|
|
"""
|
|
Attempts to add a map from the given name to a record of the given type,
|
|
in the given zone.
|
|
Also asserts whether or not the add was successful.
|
|
This can also update existing records if they have the same name.
|
|
"""
|
|
record = record_from_string(record_type_str, record_str, sep=' ')
|
|
add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
|
|
add_rec_buf.rec = record
|
|
|
|
try:
|
|
self.conn.DnssrvUpdateRecord2(client_version,
|
|
0,
|
|
self.server,
|
|
zone,
|
|
name,
|
|
add_rec_buf,
|
|
None)
|
|
if not assertion:
|
|
raise AssertionError("Successfully added record '%s' of type '%s', which should have failed."
|
|
% (record_str, record_type_str))
|
|
except RuntimeError as e:
|
|
if assertion:
|
|
raise AssertionError("Failed to add record '%s' of type '%s', which should have succeeded. Error was '%s'."
|
|
% (record_str, record_type_str, str(e)))
|
|
|
|
def delete_record(self, zone, name, record_type_str, record_str,
|
|
assertion=True, client_version=dnsserver.DNS_CLIENT_VERSION_LONGHORN):
|
|
"""
|
|
Attempts to delete a record with the given name, record and record type
|
|
from the given zone.
|
|
Also asserts whether or not the deletion was successful.
|
|
"""
|
|
record = record_from_string(record_type_str, record_str, sep=' ')
|
|
del_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
|
|
del_rec_buf.rec = record
|
|
|
|
try:
|
|
self.conn.DnssrvUpdateRecord2(client_version,
|
|
0,
|
|
self.server,
|
|
zone,
|
|
name,
|
|
None,
|
|
del_rec_buf)
|
|
if not assertion:
|
|
raise AssertionError("Successfully deleted record '%s' of type '%s', which should have failed." % (record_str, record_type_str))
|
|
except RuntimeError as e:
|
|
if assertion:
|
|
raise AssertionError("Failed to delete record '%s' of type '%s', which should have succeeded. Error was '%s'." % (record_str, record_type_str, str(e)))
|
|
|
|
def test_query2(self):
|
|
typeid, result = self.conn.DnssrvQuery2(dnsserver.DNS_CLIENT_VERSION_W2K,
|
|
0,
|
|
self.server,
|
|
None,
|
|
'ServerInfo')
|
|
self.assertEqual(dnsserver.DNSSRV_TYPEID_SERVER_INFO_W2K, typeid)
|
|
|
|
typeid, result = self.conn.DnssrvQuery2(dnsserver.DNS_CLIENT_VERSION_DOTNET,
|
|
0,
|
|
self.server,
|
|
None,
|
|
'ServerInfo')
|
|
self.assertEqual(dnsserver.DNSSRV_TYPEID_SERVER_INFO_DOTNET, typeid)
|
|
|
|
typeid, result = self.conn.DnssrvQuery2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
|
|
0,
|
|
self.server,
|
|
None,
|
|
'ServerInfo')
|
|
self.assertEqual(dnsserver.DNSSRV_TYPEID_SERVER_INFO, typeid)
|
|
|
|
|
|
# This test is to confirm that we do not support multizone operations,
|
|
# which are designated by a non-zero dwContext value (the 3rd argument
|
|
# to DnssrvOperation).
|
|
def test_operation_invalid(self):
|
|
non_zone = 'a-zone-that-does-not-exist'
|
|
typeid = dnsserver.DNSSRV_TYPEID_NAME_AND_PARAM
|
|
name_and_param = dnsserver.DNS_RPC_NAME_AND_PARAM()
|
|
name_and_param.pszNodeName = 'AllowUpdate'
|
|
name_and_param.dwParam = dnsp.DNS_ZONE_UPDATE_SECURE
|
|
try:
|
|
res = self.conn.DnssrvOperation(self.server,
|
|
non_zone,
|
|
1,
|
|
'ResetDwordProperty',
|
|
typeid,
|
|
name_and_param)
|
|
except WERRORError as e:
|
|
if e.args[0] == werror.WERR_DNS_ERROR_ZONE_DOES_NOT_EXIST:
|
|
return
|
|
|
|
# We should always encounter a DOES_NOT_EXIST error.
|
|
self.fail()
|
|
|
|
# This test is to confirm that we do not support multizone operations,
|
|
# which are designated by a non-zero dwContext value (the 5th argument
|
|
# to DnssrvOperation2).
|
|
def test_operation2_invalid(self):
|
|
client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
|
|
non_zone = 'a-zone-that-does-not-exist'
|
|
typeid = dnsserver.DNSSRV_TYPEID_NAME_AND_PARAM
|
|
name_and_param = dnsserver.DNS_RPC_NAME_AND_PARAM()
|
|
name_and_param.pszNodeName = 'AllowUpdate'
|
|
name_and_param.dwParam = dnsp.DNS_ZONE_UPDATE_SECURE
|
|
try:
|
|
res = self.conn.DnssrvOperation2(client_version,
|
|
0,
|
|
self.server,
|
|
non_zone,
|
|
1,
|
|
'ResetDwordProperty',
|
|
typeid,
|
|
name_and_param)
|
|
except WERRORError as e:
|
|
if e.args[0] == werror.WERR_DNS_ERROR_ZONE_DOES_NOT_EXIST:
|
|
return
|
|
|
|
# We should always encounter a DOES_NOT_EXIST error.
|
|
self.fail()
|
|
|
|
def test_operation2(self):
|
|
client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
|
|
rev_zone = '1.168.192.in-addr.arpa'
|
|
|
|
zone_create = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
|
|
zone_create.pszZoneName = rev_zone
|
|
zone_create.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
|
|
zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_SECURE
|
|
zone_create.fAging = 0
|
|
zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
|
|
|
|
# Create zone
|
|
self.conn.DnssrvOperation2(client_version,
|
|
0,
|
|
self.server,
|
|
None,
|
|
0,
|
|
'ZoneCreate',
|
|
dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
|
|
zone_create)
|
|
|
|
request_filter = (dnsserver.DNS_ZONE_REQUEST_REVERSE |
|
|
dnsserver.DNS_ZONE_REQUEST_PRIMARY)
|
|
_, zones = self.conn.DnssrvComplexOperation2(client_version,
|
|
0,
|
|
self.server,
|
|
None,
|
|
'EnumZones',
|
|
dnsserver.DNSSRV_TYPEID_DWORD,
|
|
request_filter)
|
|
self.assertEqual(1, zones.dwZoneCount)
|
|
|
|
# Delete zone
|
|
self.conn.DnssrvOperation2(client_version,
|
|
0,
|
|
self.server,
|
|
rev_zone,
|
|
0,
|
|
'DeleteZoneFromDs',
|
|
dnsserver.DNSSRV_TYPEID_NULL,
|
|
None)
|
|
|
|
typeid, zones = self.conn.DnssrvComplexOperation2(client_version,
|
|
0,
|
|
self.server,
|
|
None,
|
|
'EnumZones',
|
|
dnsserver.DNSSRV_TYPEID_DWORD,
|
|
request_filter)
|
|
self.assertEqual(0, zones.dwZoneCount)
|
|
|
|
def test_complexoperation2(self):
|
|
client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
|
|
request_filter = (dnsserver.DNS_ZONE_REQUEST_FORWARD |
|
|
dnsserver.DNS_ZONE_REQUEST_PRIMARY)
|
|
|
|
typeid, zones = self.conn.DnssrvComplexOperation2(client_version,
|
|
0,
|
|
self.server,
|
|
None,
|
|
'EnumZones',
|
|
dnsserver.DNSSRV_TYPEID_DWORD,
|
|
request_filter)
|
|
self.assertEqual(dnsserver.DNSSRV_TYPEID_ZONE_LIST, typeid)
|
|
self.assertEqual(3, zones.dwZoneCount)
|
|
|
|
request_filter = (dnsserver.DNS_ZONE_REQUEST_REVERSE |
|
|
dnsserver.DNS_ZONE_REQUEST_PRIMARY)
|
|
typeid, zones = self.conn.DnssrvComplexOperation2(client_version,
|
|
0,
|
|
self.server,
|
|
None,
|
|
'EnumZones',
|
|
dnsserver.DNSSRV_TYPEID_DWORD,
|
|
request_filter)
|
|
self.assertEqual(dnsserver.DNSSRV_TYPEID_ZONE_LIST, typeid)
|
|
self.assertEqual(0, zones.dwZoneCount)
|
|
|
|
def test_enumrecords2(self):
|
|
client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
|
|
record_type = dnsp.DNS_TYPE_NS
|
|
select_flags = (dnsserver.DNS_RPC_VIEW_ROOT_HINT_DATA |
|
|
dnsserver.DNS_RPC_VIEW_ADDITIONAL_DATA)
|
|
_, roothints = self.conn.DnssrvEnumRecords2(client_version,
|
|
0,
|
|
self.server,
|
|
'..RootHints',
|
|
'.',
|
|
None,
|
|
record_type,
|
|
select_flags,
|
|
None,
|
|
None)
|
|
self.assertEqual(14, roothints.count) # 1 NS + 13 A records (a-m)
|
|
|
|
def test_updaterecords2(self):
|
|
client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
|
|
record_type = dnsp.DNS_TYPE_A
|
|
select_flags = dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA
|
|
name = 'dummy'
|
|
rec = ARecord('1.2.3.4')
|
|
rec2 = ARecord('5.6.7.8')
|
|
|
|
# Add record
|
|
add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
|
|
add_rec_buf.rec = rec
|
|
self.conn.DnssrvUpdateRecord2(client_version,
|
|
0,
|
|
self.server,
|
|
self.zone,
|
|
name,
|
|
add_rec_buf,
|
|
None)
|
|
|
|
_, result = self.conn.DnssrvEnumRecords2(client_version,
|
|
0,
|
|
self.server,
|
|
self.zone,
|
|
name,
|
|
None,
|
|
record_type,
|
|
select_flags,
|
|
None,
|
|
None)
|
|
self.assertEqual(1, result.count)
|
|
self.assertEqual(1, result.rec[0].wRecordCount)
|
|
self.assertEqual(dnsp.DNS_TYPE_A, result.rec[0].records[0].wType)
|
|
self.assertEqual('1.2.3.4', result.rec[0].records[0].data)
|
|
|
|
# Update record
|
|
add_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
|
|
add_rec_buf.rec = rec2
|
|
del_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
|
|
del_rec_buf.rec = rec
|
|
self.conn.DnssrvUpdateRecord2(client_version,
|
|
0,
|
|
self.server,
|
|
self.zone,
|
|
name,
|
|
add_rec_buf,
|
|
del_rec_buf)
|
|
|
|
buflen, result = self.conn.DnssrvEnumRecords2(client_version,
|
|
0,
|
|
self.server,
|
|
self.zone,
|
|
name,
|
|
None,
|
|
record_type,
|
|
select_flags,
|
|
None,
|
|
None)
|
|
self.assertEqual(1, result.count)
|
|
self.assertEqual(1, result.rec[0].wRecordCount)
|
|
self.assertEqual(dnsp.DNS_TYPE_A, result.rec[0].records[0].wType)
|
|
self.assertEqual('5.6.7.8', result.rec[0].records[0].data)
|
|
|
|
# Delete record
|
|
del_rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
|
|
del_rec_buf.rec = rec2
|
|
self.conn.DnssrvUpdateRecord2(client_version,
|
|
0,
|
|
self.server,
|
|
self.zone,
|
|
name,
|
|
None,
|
|
del_rec_buf)
|
|
|
|
self.assertRaises(RuntimeError, self.conn.DnssrvEnumRecords2,
|
|
client_version,
|
|
0,
|
|
self.server,
|
|
self.zone,
|
|
name,
|
|
None,
|
|
record_type,
|
|
select_flags,
|
|
None,
|
|
None)
|
|
|
|
# The following tests do not pass against Samba because the owner and
|
|
# group are not consistent with Windows, as well as some ACEs.
|
|
#
|
|
# The following ACE are also required for 2012R2:
|
|
#
|
|
# (OA;CIIO;WP;ea1b7b93-5e48-46d5-bc6c-4df4fda78a35;bf967a86-0de6-11d0-a285-00aa003049e2;PS)
|
|
# (OA;OICI;RPWP;3f78c3e5-f79a-46bd-a0b8-9d18116ddc79;;PS)"
|
|
#
|
|
# [TPM + Allowed-To-Act-On-Behalf-Of-Other-Identity]
|
|
def test_security_descriptor_msdcs_zone(self):
|
|
"""
|
|
Make sure that security descriptors of the msdcs zone is
|
|
as expected.
|
|
"""
|
|
|
|
zones = self.samdb.search(base="DC=ForestDnsZones,%s" % self.samdb.get_default_basedn(),
|
|
scope=ldb.SCOPE_SUBTREE,
|
|
expression="(&(objectClass=dnsZone)(name=_msdcs*))",
|
|
attrs=["nTSecurityDescriptor", "objectClass"])
|
|
self.assertEqual(len(zones), 1)
|
|
self.assertIn("nTSecurityDescriptor", zones[0])
|
|
tmp = zones[0]["nTSecurityDescriptor"][0]
|
|
utils = sd_utils.SDUtils(self.samdb)
|
|
sd = ndr_unpack(security.descriptor, tmp)
|
|
|
|
domain_sid = security.dom_sid(self.samdb.get_domain_sid())
|
|
|
|
res = self.samdb.search(base=self.samdb.get_default_basedn(), scope=ldb.SCOPE_SUBTREE,
|
|
expression="(sAMAccountName=DnsAdmins)",
|
|
attrs=["objectSid"])
|
|
|
|
dns_admin = str(ndr_unpack(security.dom_sid, res[0]['objectSid'][0]))
|
|
|
|
packed_sd = descriptor.sddl2binary("O:SYG:BA"
|
|
"D:AI(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)"
|
|
"(A;;CC;;;AU)"
|
|
"(A;;RPLCLORC;;;WD)"
|
|
"(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)"
|
|
"(A;CI;RPWPCRCCDCLCRCWOWDSDDTSW;;;ED)",
|
|
domain_sid, {"DnsAdmins": dns_admin})
|
|
expected_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor, packed_sd))
|
|
|
|
diff = descriptor.get_diff_sds(expected_sd, sd, domain_sid)
|
|
self.assertEqual(diff, '', "SD of msdcs zone different to expected.\n"
|
|
"Difference was:\n%s\nExpected: %s\nGot: %s" %
|
|
(diff, expected_sd.as_sddl(utils.domain_sid),
|
|
sd.as_sddl(utils.domain_sid)))
|
|
|
|
def test_security_descriptor_forest_zone(self):
|
|
"""
|
|
Make sure that security descriptors of forest dns zones are
|
|
as expected.
|
|
"""
|
|
forest_zone = "test_forest_zone"
|
|
zone_create_info = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
|
|
zone_create_info.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
|
|
zone_create_info.fAging = 0
|
|
zone_create_info.fDsIntegrated = 1
|
|
zone_create_info.fLoadExisting = 1
|
|
|
|
zone_create_info.pszZoneName = forest_zone
|
|
zone_create_info.dwDpFlags = dnsserver.DNS_DP_FOREST_DEFAULT
|
|
|
|
self.conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
|
|
0,
|
|
self.server,
|
|
None,
|
|
0,
|
|
'ZoneCreate',
|
|
dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
|
|
zone_create_info)
|
|
|
|
partition_dn = self.samdb.get_default_basedn()
|
|
partition_dn.add_child("DC=ForestDnsZones")
|
|
zones = self.samdb.search(base=partition_dn, scope=ldb.SCOPE_SUBTREE,
|
|
expression="(name=%s)" % forest_zone,
|
|
attrs=["nTSecurityDescriptor"])
|
|
self.assertEqual(len(zones), 1)
|
|
current_dn = zones[0].dn
|
|
self.assertIn("nTSecurityDescriptor", zones[0])
|
|
tmp = zones[0]["nTSecurityDescriptor"][0]
|
|
utils = sd_utils.SDUtils(self.samdb)
|
|
sd = ndr_unpack(security.descriptor, tmp)
|
|
|
|
domain_sid = security.dom_sid(self.samdb.get_domain_sid())
|
|
|
|
res = self.samdb.search(base=self.samdb.get_default_basedn(),
|
|
scope=ldb.SCOPE_SUBTREE,
|
|
expression="(sAMAccountName=DnsAdmins)",
|
|
attrs=["objectSid"])
|
|
|
|
dns_admin = str(ndr_unpack(security.dom_sid, res[0]['objectSid'][0]))
|
|
|
|
packed_sd = descriptor.sddl2binary("O:DAG:DA"
|
|
"D:AI(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)"
|
|
"(A;;CC;;;AU)"
|
|
"(A;;RPLCLORC;;;WD)"
|
|
"(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)"
|
|
"(A;CI;RPWPCRCCDCLCRCWOWDSDDTSW;;;ED)",
|
|
domain_sid, {"DnsAdmins": dns_admin})
|
|
expected_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor, packed_sd))
|
|
|
|
packed_msdns = descriptor.get_dns_forest_microsoft_dns_descriptor(domain_sid,
|
|
{"DnsAdmins": dns_admin})
|
|
expected_msdns_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor, packed_msdns))
|
|
|
|
packed_part_sd = descriptor.get_dns_partition_descriptor(domain_sid)
|
|
expected_part_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor,
|
|
packed_part_sd))
|
|
try:
|
|
msdns_dn = ldb.Dn(self.samdb, "CN=MicrosoftDNS,%s" % str(partition_dn))
|
|
security_desc_dict = [(current_dn.get_linearized(), expected_sd),
|
|
(msdns_dn.get_linearized(), expected_msdns_sd),
|
|
(partition_dn.get_linearized(), expected_part_sd)]
|
|
|
|
for (key, sec_desc) in security_desc_dict:
|
|
zones = self.samdb.search(base=key, scope=ldb.SCOPE_BASE,
|
|
attrs=["nTSecurityDescriptor"])
|
|
self.assertIn("nTSecurityDescriptor", zones[0])
|
|
tmp = zones[0]["nTSecurityDescriptor"][0]
|
|
utils = sd_utils.SDUtils(self.samdb)
|
|
|
|
sd = ndr_unpack(security.descriptor, tmp)
|
|
diff = descriptor.get_diff_sds(sec_desc, sd, domain_sid)
|
|
|
|
self.assertEqual(diff, '', "Security descriptor of forest DNS zone with DN '%s' different to expected. Difference was:\n%s\nExpected: %s\nGot: %s"
|
|
% (key, diff, sec_desc.as_sddl(utils.domain_sid), sd.as_sddl(utils.domain_sid)))
|
|
|
|
finally:
|
|
self.conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
|
|
0,
|
|
self.server,
|
|
forest_zone,
|
|
0,
|
|
'DeleteZoneFromDs',
|
|
dnsserver.DNSSRV_TYPEID_NULL,
|
|
None)
|
|
|
|
def test_security_descriptor_domain_zone(self):
|
|
"""
|
|
Make sure that security descriptors of domain dns zones are
|
|
as expected.
|
|
"""
|
|
|
|
partition_dn = self.samdb.get_default_basedn()
|
|
partition_dn.add_child("DC=DomainDnsZones")
|
|
zones = self.samdb.search(base=partition_dn, scope=ldb.SCOPE_SUBTREE,
|
|
expression="(name=%s)" % self.custom_zone,
|
|
attrs=["nTSecurityDescriptor"])
|
|
self.assertEqual(len(zones), 1)
|
|
current_dn = zones[0].dn
|
|
self.assertIn("nTSecurityDescriptor", zones[0])
|
|
tmp = zones[0]["nTSecurityDescriptor"][0]
|
|
utils = sd_utils.SDUtils(self.samdb)
|
|
sd = ndr_unpack(security.descriptor, tmp)
|
|
sddl = sd.as_sddl(utils.domain_sid)
|
|
|
|
domain_sid = security.dom_sid(self.samdb.get_domain_sid())
|
|
|
|
res = self.samdb.search(base=self.samdb.get_default_basedn(), scope=ldb.SCOPE_SUBTREE,
|
|
expression="(sAMAccountName=DnsAdmins)",
|
|
attrs=["objectSid"])
|
|
|
|
dns_admin = str(ndr_unpack(security.dom_sid, res[0]['objectSid'][0]))
|
|
|
|
packed_sd = descriptor.sddl2binary("O:DAG:DA"
|
|
"D:AI(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;DA)"
|
|
"(A;;CC;;;AU)"
|
|
"(A;;RPLCLORC;;;WD)"
|
|
"(A;;RPWPCRCCDCLCLORCWOWDSDDTSW;;;SY)"
|
|
"(A;CI;RPWPCRCCDCLCRCWOWDSDDTSW;;;ED)",
|
|
domain_sid, {"DnsAdmins": dns_admin})
|
|
expected_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor, packed_sd))
|
|
|
|
packed_msdns = descriptor.get_dns_domain_microsoft_dns_descriptor(domain_sid,
|
|
{"DnsAdmins": dns_admin})
|
|
expected_msdns_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor, packed_msdns))
|
|
|
|
packed_part_sd = descriptor.get_dns_partition_descriptor(domain_sid)
|
|
expected_part_sd = descriptor.get_clean_sd(ndr_unpack(security.descriptor,
|
|
packed_part_sd))
|
|
|
|
msdns_dn = ldb.Dn(self.samdb, "CN=MicrosoftDNS,%s" % str(partition_dn))
|
|
security_desc_dict = [(current_dn.get_linearized(), expected_sd),
|
|
(msdns_dn.get_linearized(), expected_msdns_sd),
|
|
(partition_dn.get_linearized(), expected_part_sd)]
|
|
|
|
for (key, sec_desc) in security_desc_dict:
|
|
zones = self.samdb.search(base=key, scope=ldb.SCOPE_BASE,
|
|
attrs=["nTSecurityDescriptor"])
|
|
self.assertIn("nTSecurityDescriptor", zones[0])
|
|
tmp = zones[0]["nTSecurityDescriptor"][0]
|
|
utils = sd_utils.SDUtils(self.samdb)
|
|
|
|
sd = ndr_unpack(security.descriptor, tmp)
|
|
diff = descriptor.get_diff_sds(sec_desc, sd, domain_sid)
|
|
|
|
self.assertEqual(diff, '', "Security descriptor of domain DNS zone with DN '%s' different to expected. Difference was:\n%s\nExpected: %s\nGot: %s"
|
|
% (key, diff, sec_desc.as_sddl(utils.domain_sid), sd.as_sddl(utils.domain_sid)))
|