1
0
mirror of https://github.com/samba-team/samba.git synced 2025-12-24 04:23:53 +03:00
Files
samba-mirror/python/samba/tests/samba_tool/dnscmd.py
Douglas Bagnall 2f7aa81a9f samba-tool dns zoneoptions: timestamp manipulation options
There was a bug in Samba before 4.9 that marked all records intended
to be static with a current timestamp, and all records intended to be
dynamic with a zero timestamp. This was exactly the opposite of
correct behaviour.

It follows that a domain which has been upgraded past 4.9, but on
which aging is not enabled, records intended to be static will have a
timestamp from before the upgrade date (unless their nodes have
suffered a DNS update, which due to another bug, will change the
timestmap). The following command will make these truly static:

$ samba-tool dns zoneoptions --mark-old-records-static=2018-07-23 -U...

where '2018-07-23' should be replaced by the approximate date of the
upgrade beyond 4.9.

It seems riskier making blanket conversions of static records into
dynamic records, but there are sometimes useful patterns in the names
given to machines that we can exploit. For example, if there is a
group of machines with names like 'desktop-123' that are all supposed
to using dynamic DNS, the adminstrator can go

$ samba-tool dns zoneoptions --mark-records-dynamic-regex='desktop-\d+'

and there's a --mark-records-static-regex for symmetry.

These options are deliberately long and cumbersome to type, so people
have a chance to think before they get to the end. We also introduce a
'--dry-run' (or '-n') option so they can inspect the likely results
before going ahead.

*NOTE* ageing will still not work properly after this commit, due to
other bugs that will be fixed in other commits.

Signed-off-by: Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
2021-06-02 03:56:36 +00:00

1410 lines
64 KiB
Python

# Unix SMB/CIFS implementation.
# Copyright (C) Andrew Bartlett <abartlet@catalyst.net.nz>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import os
import ldb
import re
from samba.auth import system_session
from samba.samdb import SamDB
from samba.ndr import ndr_unpack, ndr_pack
from samba.dcerpc import dnsp
from samba.tests.samba_tool.base import SambaToolCmdTest
import time
from samba import dsdb_dns
class DnsCmdTestCase(SambaToolCmdTest):
def setUp(self):
super(DnsCmdTestCase, self).setUp()
self.dburl = "ldap://%s" % os.environ["SERVER"]
self.creds_string = "-U%s%%%s" % (os.environ["DC_USERNAME"],
os.environ["DC_PASSWORD"])
self.samdb = self.getSamDB("-H", self.dburl, self.creds_string)
self.config_dn = str(self.samdb.get_config_basedn())
self.testip = "192.168.0.193"
self.testip2 = "192.168.0.194"
self.addCleanup(self.deleteZone)
self.addZone()
# Note: SOA types don't work (and shouldn't), as we only have one zone per DNS record.
good_dns = ["SAMDOM.EXAMPLE.COM",
"1.EXAMPLE.COM",
"%sEXAMPLE.COM" % ("1." * 100),
"EXAMPLE",
"!@#$%^&*()_",
"HIGH\xFFBYTE",
"@.EXAMPLE.COM",
"."]
bad_dns = ["...",
".EXAMPLE.COM",
".EXAMPLE.",
"",
"SAMDOM..EXAMPLE.COM"]
good_mx = ["SAMDOM.EXAMPLE.COM 65530",
"SAMDOM.EXAMPLE.COM 0"]
bad_mx = ["SAMDOM.EXAMPLE.COM -1",
"SAMDOM.EXAMPLE.COM",
" ",
"SAMDOM.EXAMPLE.COM 1 1",
"SAMDOM.EXAMPLE.COM SAMDOM.EXAMPLE.COM"]
good_srv = ["SAMDOM.EXAMPLE.COM 65530 65530 65530",
"SAMDOM.EXAMPLE.COM 1 1 1"]
bad_srv = ["SAMDOM.EXAMPLE.COM 0 65536 0",
"SAMDOM.EXAMPLE.COM 0 0 65536",
"SAMDOM.EXAMPLE.COM 65536 0 0"]
for bad_dn in bad_dns:
bad_mx.append("%s 1" % bad_dn)
bad_srv.append("%s 0 0 0" % bad_dn)
for good_dn in good_dns:
good_mx.append("%s 1" % good_dn)
good_srv.append("%s 0 0 0" % good_dn)
self.good_records = {
"A":["192.168.0.1", "255.255.255.255"],
"AAAA":["1234:5678:9ABC:DEF0:0000:0000:0000:0000",
"0000:0000:0000:0000:0000:0000:0000:0000",
"1234:5678:9ABC:DEF0:1234:5678:9ABC:DEF0",
"1234:1234:1234::",
"1234:5678:9ABC:DEF0::",
"0000:0000::0000",
"1234::5678:9ABC:0000:0000:0000:0000",
"::1",
"::",
"1:1:1:1:1:1:1:1"],
"PTR": good_dns,
"CNAME": good_dns,
"NS": good_dns,
"MX": good_mx,
"SRV": good_srv,
"TXT": ["text", "", "@#!", "\n"]
}
self.bad_records = {
"A":["192.168.0.500",
"255.255.255.255/32"],
"AAAA":["GGGG:1234:5678:9ABC:0000:0000:0000:0000",
"0000:0000:0000:0000:0000:0000:0000:0000/1",
"AAAA:AAAA:AAAA:AAAA:G000:0000:0000:1234",
"1234:5678:9ABC:DEF0:1234:5678:9ABC:DEF0:1234",
"1234:5678:9ABC:DEF0:1234:5678:9ABC",
"1111::1111::1111"],
"PTR": bad_dns,
"CNAME": bad_dns,
"NS": bad_dns,
"MX": bad_mx,
"SRV": bad_srv
}
def resetZone(self):
self.deleteZone()
self.addZone()
def addZone(self):
self.zone = "zone"
result, out, err = self.runsubcmd("dns",
"zonecreate",
os.environ["SERVER"],
self.zone,
self.creds_string)
self.assertCmdSuccess(result, out, err)
def deleteZone(self):
result, out, err = self.runsubcmd("dns",
"zonedelete",
os.environ["SERVER"],
self.zone,
self.creds_string)
self.assertCmdSuccess(result, out, err)
def get_all_records(self, zone_name):
zone_dn = (f"DC={zone_name},CN=MicrosoftDNS,DC=DomainDNSZones,"
f"{self.samdb.get_default_basedn()}")
expression = "(&(objectClass=dnsNode)(!(dNSTombstoned=TRUE)))"
nodes = self.samdb.search(base=zone_dn, scope=ldb.SCOPE_SUBTREE,
expression=expression,
attrs=["dnsRecord", "name"])
record_map = {}
for node in nodes:
name = node["name"][0].decode()
record_map[name] = list(node["dnsRecord"])
return record_map
def get_record_from_db(self, zone_name, record_name):
zones = self.samdb.search(base="DC=DomainDnsZones,%s"
% self.samdb.get_default_basedn(),
scope=ldb.SCOPE_SUBTREE,
expression="(objectClass=dnsZone)",
attrs=["cn"])
for zone in zones:
if zone_name in str(zone.dn):
zone_dn = zone.dn
break
records = self.samdb.search(base=zone_dn, scope=ldb.SCOPE_SUBTREE,
expression="(objectClass=dnsNode)",
attrs=["dnsRecord"])
for old_packed_record in records:
if record_name in str(old_packed_record.dn):
return (old_packed_record.dn,
ndr_unpack(dnsp.DnssrvRpcRecord,
old_packed_record["dnsRecord"][0]))
def test_rank_none(self):
record_str = "192.168.50.50"
record_type_str = "A"
result, out, err = self.runsubcmd("dns", "add", os.environ["SERVER"],
self.zone, "testrecord", record_type_str,
record_str, self.creds_string)
self.assertCmdSuccess(result, out, err,
"Failed to add record '%s' with type %s."
% (record_str, record_type_str))
dn, record = self.get_record_from_db(self.zone, "testrecord")
record.rank = 0 # DNS_RANK_NONE
res = self.samdb.dns_replace_by_dn(dn, [record])
if res is not None:
self.fail("Unable to update dns record to have DNS_RANK_NONE.")
errors = []
# The record should still exist
result, out, err = self.runsubcmd("dns", "query", os.environ["SERVER"],
self.zone, "testrecord", record_type_str,
self.creds_string)
try:
self.assertCmdSuccess(result, out, err,
"Failed to query for a record"
"which had DNS_RANK_NONE.")
self.assertTrue("testrecord" in out and record_str in out,
"Query for a record which had DNS_RANK_NONE"
"succeeded but produced no resulting records.")
except AssertionError as e:
# Windows produces no resulting records
pass
# We should not be able to add a duplicate
result, out, err = self.runsubcmd("dns", "add", os.environ["SERVER"],
self.zone, "testrecord", record_type_str,
record_str, self.creds_string)
try:
self.assertCmdFail(result, "Successfully added duplicate record"
"of one which had DNS_RANK_NONE.")
except AssertionError as e:
errors.append(e)
# We should be able to delete it
result, out, err = self.runsubcmd("dns", "delete", os.environ["SERVER"],
self.zone, "testrecord", record_type_str,
record_str, self.creds_string)
try:
self.assertCmdSuccess(result, out, err, "Failed to delete record"
"which had DNS_RANK_NONE.")
except AssertionError as e:
errors.append(e)
# Now the record should not exist
result, out, err = self.runsubcmd("dns", "query", os.environ["SERVER"],
self.zone, "testrecord",
record_type_str, self.creds_string)
try:
self.assertCmdFail(result, "Successfully queried for deleted record"
"which had DNS_RANK_NONE.")
except AssertionError as e:
errors.append(e)
if len(errors) > 0:
err_str = "Failed appropriate behaviour with DNS_RANK_NONE:"
for error in errors:
err_str = err_str + "\n" + str(error)
raise AssertionError(err_str)
def test_accept_valid_commands(self):
"""
For all good records, attempt to add, query and delete them.
"""
num_failures = 0
failure_msgs = []
for dnstype in self.good_records:
for record in self.good_records[dnstype]:
try:
result, out, err = self.runsubcmd("dns", "add",
os.environ["SERVER"],
self.zone, "testrecord",
dnstype, record,
self.creds_string)
self.assertCmdSuccess(result, out, err, "Failed to add"
"record %s with type %s."
% (record, dnstype))
result, out, err = self.runsubcmd("dns", "query",
os.environ["SERVER"],
self.zone, "testrecord",
dnstype,
self.creds_string)
self.assertCmdSuccess(result, out, err, "Failed to query"
"record %s with qualifier %s."
% (record, dnstype))
result, out, err = self.runsubcmd("dns", "delete",
os.environ["SERVER"],
self.zone, "testrecord",
dnstype, record,
self.creds_string)
self.assertCmdSuccess(result, out, err, "Failed to remove"
"record %s with type %s."
% (record, dnstype))
except AssertionError as e:
num_failures = num_failures + 1
failure_msgs.append(e)
if num_failures > 0:
for msg in failure_msgs:
print(msg)
self.fail("Failed to accept valid commands. %d total failures."
"Errors above." % num_failures)
def test_reject_invalid_commands(self):
"""
For all bad records, attempt to add them and update to them,
making sure that both operations fail.
"""
num_failures = 0
failure_msgs = []
# Add invalid records and make sure they fail to be added
for dnstype in self.bad_records:
for record in self.bad_records[dnstype]:
try:
result, out, err = self.runsubcmd("dns", "add",
os.environ["SERVER"],
self.zone, "testrecord",
dnstype, record,
self.creds_string)
self.assertCmdFail(result, "Successfully added invalid"
"record '%s' of type '%s'."
% (record, dnstype))
except AssertionError as e:
num_failures = num_failures + 1
failure_msgs.append(e)
self.resetZone()
try:
result, out, err = self.runsubcmd("dns", "delete",
os.environ["SERVER"],
self.zone, "testrecord",
dnstype, record,
self.creds_string)
self.assertCmdFail(result, "Successfully deleted invalid"
"record '%s' of type '%s' which"
"shouldn't exist." % (record, dnstype))
except AssertionError as e:
num_failures = num_failures + 1
failure_msgs.append(e)
self.resetZone()
# Update valid records to invalid ones and make sure they
# fail to be updated
for dnstype in self.bad_records:
for bad_record in self.bad_records[dnstype]:
good_record = self.good_records[dnstype][0]
try:
result, out, err = self.runsubcmd("dns", "add",
os.environ["SERVER"],
self.zone, "testrecord",
dnstype, good_record,
self.creds_string)
self.assertCmdSuccess(result, out, err, "Failed to add "
"record '%s' with type %s."
% (record, dnstype))
result, out, err = self.runsubcmd("dns", "update",
os.environ["SERVER"],
self.zone, "testrecord",
dnstype, good_record,
bad_record,
self.creds_string)
self.assertCmdFail(result, "Successfully updated valid "
"record '%s' of type '%s' to invalid "
"record '%s' of the same type."
% (good_record, dnstype, bad_record))
result, out, err = self.runsubcmd("dns", "delete",
os.environ["SERVER"],
self.zone, "testrecord",
dnstype, good_record,
self.creds_string)
self.assertCmdSuccess(result, out, err, "Could not delete "
"valid record '%s' of type '%s'."
% (good_record, dnstype))
except AssertionError as e:
num_failures = num_failures + 1
failure_msgs.append(e)
self.resetZone()
if num_failures > 0:
for msg in failure_msgs:
print(msg)
self.fail("Failed to reject invalid commands. %d total failures. "
"Errors above." % num_failures)
def test_update_invalid_type(self):
"""
Make sure that a record can't be updated to one of a different type.
"""
for dnstype1 in self.good_records:
record1 = self.good_records[dnstype1][0]
result, out, err = self.runsubcmd("dns", "add",
os.environ["SERVER"],
self.zone, "testrecord",
dnstype1, record1,
self.creds_string)
self.assertCmdSuccess(result, out, err, "Failed to add "
"record %s with type %s."
% (record1, dnstype1))
for dnstype2 in self.good_records:
record2 = self.good_records[dnstype2][0]
# Make sure that record2 isn't a valid entry of dnstype1.
# For example, any A-type will also be a valid TXT-type.
result, out, err = self.runsubcmd("dns", "add",
os.environ["SERVER"],
self.zone, "testrecord",
dnstype1, record2,
self.creds_string)
try:
self.assertCmdFail(result)
except AssertionError:
continue # Don't check this one, because record2 _is_ a valid entry of dnstype1.
# Check both ways: Give the current type and try to update,
# and give the new type and try to update.
result, out, err = self.runsubcmd("dns", "update",
os.environ["SERVER"],
self.zone, "testrecord",
dnstype1, record1,
record2, self.creds_string)
self.assertCmdFail(result, "Successfully updated record '%s' "
"to '%s', even though the latter is of "
"type '%s' where '%s' was expected."
% (record1, record2, dnstype2, dnstype1))
result, out, err = self.runsubcmd("dns", "update",
os.environ["SERVER"],
self.zone, "testrecord",
dnstype2, record1, record2,
self.creds_string)
self.assertCmdFail(result, "Successfully updated record "
"'%s' to '%s', even though the former "
"is of type '%s' where '%s' was expected."
% (record1, record2, dnstype1, dnstype2))
def test_update_valid_type(self):
for dnstype in self.good_records:
for record in self.good_records[dnstype]:
result, out, err = self.runsubcmd("dns", "add",
os.environ["SERVER"],
self.zone, "testrecord",
dnstype, record,
self.creds_string)
self.assertCmdSuccess(result, out, err, "Failed to add "
"record %s with type %s."
% (record, dnstype))
# Update the record to be the same.
result, out, err = self.runsubcmd("dns", "update",
os.environ["SERVER"],
self.zone, "testrecord",
dnstype, record, record,
self.creds_string)
self.assertCmdFail(result, "Successfully updated record "
"'%s' to be exactly the same." % record)
result, out, err = self.runsubcmd("dns", "delete",
os.environ["SERVER"],
self.zone, "testrecord",
dnstype, record,
self.creds_string)
self.assertCmdSuccess(result, out, err, "Could not delete "
"valid record '%s' of type '%s'."
% (record, dnstype))
for record in self.good_records["SRV"]:
result, out, err = self.runsubcmd("dns", "add",
os.environ["SERVER"],
self.zone, "testrecord",
"SRV", record,
self.creds_string)
self.assertCmdSuccess(result, out, err, "Failed to add "
"record %s with type 'SRV'." % record)
split = record.split()
new_bit = str(int(split[3]) + 1)
new_record = '%s %s %s %s' % (split[0], split[1], split[2], new_bit)
result, out, err = self.runsubcmd("dns", "update",
os.environ["SERVER"],
self.zone, "testrecord",
"SRV", record,
new_record, self.creds_string)
self.assertCmdSuccess(result, out, err, "Failed to update record "
"'%s' of type '%s' to '%s'."
% (record, "SRV", new_record))
result, out, err = self.runsubcmd("dns", "query",
os.environ["SERVER"],
self.zone, "testrecord",
"SRV", self.creds_string)
self.assertCmdSuccess(result, out, err, "Failed to query for "
"record '%s' of type '%s'."
% (new_record, "SRV"))
result, out, err = self.runsubcmd("dns", "delete",
os.environ["SERVER"],
self.zone, "testrecord",
"SRV", new_record,
self.creds_string)
self.assertCmdSuccess(result, out, err, "Could not delete "
"valid record '%s' of type '%s'."
% (new_record, "SRV"))
# Since 'dns update' takes the current value as a parameter, make sure
# we can't enter the wrong current value for a given record.
for dnstype in self.good_records:
if len(self.good_records[dnstype]) < 3:
continue # Not enough records of this type to do this test
used_record = self.good_records[dnstype][0]
unused_record = self.good_records[dnstype][1]
new_record = self.good_records[dnstype][2]
result, out, err = self.runsubcmd("dns", "add",
os.environ["SERVER"],
self.zone, "testrecord",
dnstype, used_record,
self.creds_string)
self.assertCmdSuccess(result, out, err, "Failed to add record %s "
"with type %s." % (used_record, dnstype))
result, out, err = self.runsubcmd("dns", "update",
os.environ["SERVER"],
self.zone, "testrecord",
dnstype, unused_record,
new_record,
self.creds_string)
self.assertCmdFail(result, "Successfully updated record '%s' "
"from '%s' to '%s', even though the given "
"source record is incorrect."
% (used_record, unused_record, new_record))
def test_invalid_types(self):
result, out, err = self.runsubcmd("dns", "add",
os.environ["SERVER"],
self.zone, "testrecord",
"SOA", "test",
self.creds_string)
self.assertCmdFail(result, "Successfully added record of type SOA, "
"when this type should not be available.")
self.assertTrue("type SOA is not supported" in err,
"Invalid error message '%s' when attempting to "
"add record of type SOA." % err)
def test_add_overlapping_different_type(self):
"""
Make sure that we can add an entry with the same name as an existing one but a different type.
"""
i = 0
for dnstype1 in self.good_records:
record1 = self.good_records[dnstype1][0]
for dnstype2 in self.good_records:
# Only do some subset of dns types, otherwise it takes a long time.
i += 1
if i % 4 != 0:
continue
if dnstype1 == dnstype2:
continue
record2 = self.good_records[dnstype2][0]
result, out, err = self.runsubcmd("dns", "add",
os.environ["SERVER"],
self.zone, "testrecord",
dnstype1, record1,
self.creds_string)
self.assertCmdSuccess(result, out, err, "Failed to add record "
"'%s' of type '%s'." % (record1, dnstype1))
result, out, err = self.runsubcmd("dns", "add",
os.environ["SERVER"],
self.zone, "testrecord",
dnstype2, record2,
self.creds_string)
self.assertCmdSuccess(result, out, err, "Failed to add record "
"'%s' of type '%s' when a record '%s' "
"of type '%s' with the same name exists."
% (record1, dnstype1, record2, dnstype2))
result, out, err = self.runsubcmd("dns", "query",
os.environ["SERVER"],
self.zone, "testrecord",
dnstype1, self.creds_string)
self.assertCmdSuccess(result, out, err, "Failed to query for "
"record '%s' of type '%s' when a new "
"record '%s' of type '%s' with the same "
"name was added."
% (record1, dnstype1, record2, dnstype2))
result, out, err = self.runsubcmd("dns", "query",
os.environ["SERVER"],
self.zone, "testrecord",
dnstype2, self.creds_string)
self.assertCmdSuccess(result, out, err, "Failed to query "
"record '%s' of type '%s' which should "
"have been added with the same name as "
"record '%s' of type '%s'."
% (record2, dnstype2, record1, dnstype1))
result, out, err = self.runsubcmd("dns", "delete",
os.environ["SERVER"],
self.zone, "testrecord",
dnstype1, record1,
self.creds_string)
self.assertCmdSuccess(result, out, err, "Failed to delete "
"record '%s' of type '%s'."
% (record1, dnstype1))
result, out, err = self.runsubcmd("dns", "delete",
os.environ["SERVER"],
self.zone, "testrecord",
dnstype2, record2,
self.creds_string)
self.assertCmdSuccess(result, out, err, "Failed to delete "
"record '%s' of type '%s'."
% (record2, dnstype2))
def test_query_deleted_record(self):
self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
"testrecord", "A", self.testip, self.creds_string)
self.runsubcmd("dns", "delete", os.environ["SERVER"], self.zone,
"testrecord", "A", self.testip, self.creds_string)
result, out, err = self.runsubcmd("dns", "query",
os.environ["SERVER"],
self.zone, "testrecord",
"A", self.creds_string)
self.assertCmdFail(result)
def test_add_duplicate_record(self):
for record_type in self.good_records:
result, out, err = self.runsubcmd("dns", "add",
os.environ["SERVER"],
self.zone, "testrecord",
record_type,
self.good_records[record_type][0],
self.creds_string)
self.assertCmdSuccess(result, out, err)
result, out, err = self.runsubcmd("dns", "add",
os.environ["SERVER"],
self.zone, "testrecord",
record_type,
self.good_records[record_type][0],
self.creds_string)
self.assertCmdFail(result)
result, out, err = self.runsubcmd("dns", "query",
os.environ["SERVER"],
self.zone, "testrecord",
record_type, self.creds_string)
self.assertCmdSuccess(result, out, err)
result, out, err = self.runsubcmd("dns", "delete",
os.environ["SERVER"],
self.zone, "testrecord",
record_type,
self.good_records[record_type][0],
self.creds_string)
self.assertCmdSuccess(result, out, err)
def test_remove_deleted_record(self):
self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
"testrecord", "A", self.testip, self.creds_string)
self.runsubcmd("dns", "delete", os.environ["SERVER"], self.zone,
"testrecord", "A", self.testip, self.creds_string)
# Attempting to delete a record that has already been deleted or has never existed should fail
result, out, err = self.runsubcmd("dns", "delete",
os.environ["SERVER"],
self.zone, "testrecord",
"A", self.testip, self.creds_string)
self.assertCmdFail(result)
result, out, err = self.runsubcmd("dns", "query",
os.environ["SERVER"],
self.zone, "testrecord",
"A", self.creds_string)
self.assertCmdFail(result)
result, out, err = self.runsubcmd("dns", "delete",
os.environ["SERVER"],
self.zone, "testrecord2",
"A", self.testip, self.creds_string)
self.assertCmdFail(result)
def test_cleanup_record(self):
"""
Test dns cleanup command is working fine.
"""
# add a A record
self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
'testa', "A", self.testip, self.creds_string)
# the above A record points to this host
dnshostname = '{0}.{1}'.format('testa', self.zone.lower())
# add a CNAME record points to above host
self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
'testcname', "CNAME", dnshostname, self.creds_string)
# add a NS record
self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
'testns', "NS", dnshostname, self.creds_string)
# add a PTR record points to above host
self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
'testptr', "PTR", dnshostname, self.creds_string)
# add a SRV record points to above host
srv_record = "{0} 65530 65530 65530".format(dnshostname)
self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
'testsrv', "SRV", srv_record, self.creds_string)
# cleanup record for this dns host
self.runsubcmd("dns", "cleanup", os.environ["SERVER"],
dnshostname, self.creds_string)
# all records should be marked as dNSTombstoned
for record_name in ['testa', 'testcname', 'testns', 'testptr', 'testsrv']:
records = self.samdb.search(
base="DC=DomainDnsZones,{0}".format(self.samdb.get_default_basedn()),
scope=ldb.SCOPE_SUBTREE,
expression="(&(objectClass=dnsNode)(name={0}))".format(record_name),
attrs=["dNSTombstoned"])
self.assertEqual(len(records), 1)
for record in records:
self.assertEqual(str(record['dNSTombstoned']), 'TRUE')
def test_cleanup_record_no_A_record(self):
"""
Test dns cleanup command works with no A record.
"""
# add a A record
self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
'notesta', "A", self.testip, self.creds_string)
# the above A record points to this host
dnshostname = '{0}.{1}'.format('testa', self.zone.lower())
# add a CNAME record points to above host
self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
'notestcname', "CNAME", dnshostname, self.creds_string)
# add a NS record
self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
'notestns', "NS", dnshostname, self.creds_string)
# add a PTR record points to above host
self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
'notestptr', "PTR", dnshostname, self.creds_string)
# add a SRV record points to above host
srv_record = "{0} 65530 65530 65530".format(dnshostname)
self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
'notestsrv', "SRV", srv_record, self.creds_string)
# Remove the initial A record (leading to hanging references)
self.runsubcmd("dns", "delete", os.environ["SERVER"], self.zone,
'notesta', "A", self.testip, self.creds_string)
# cleanup record for this dns host
self.runsubcmd("dns", "cleanup", os.environ["SERVER"],
dnshostname, self.creds_string)
# all records should be marked as dNSTombstoned
for record_name in ['notestcname', 'notestns', 'notestptr', 'notestsrv']:
records = self.samdb.search(
base="DC=DomainDnsZones,{0}".format(self.samdb.get_default_basedn()),
scope=ldb.SCOPE_SUBTREE,
expression="(&(objectClass=dnsNode)(name={0}))".format(record_name),
attrs=["dNSTombstoned"])
self.assertEqual(len(records), 1)
for record in records:
self.assertEqual(str(record['dNSTombstoned']), 'TRUE')
def test_cleanup_multi_srv_record(self):
"""
Test dns cleanup command for multi-valued SRV record.
Steps:
- Add 2 A records host1 and host2
- Add a SRV record srv1 and points to both host1 and host2
- Run cleanup command for host1
- Check records for srv1, data for host1 should be gone and host2 is kept.
"""
hosts = ['host1', 'host2'] # A record names
srv_name = 'srv1'
# add A records
for host in hosts:
self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
host, "A", self.testip, self.creds_string)
# the above A record points to this host
dnshostname = '{0}.{1}'.format(host, self.zone.lower())
# add a SRV record points to above host
srv_record = "{0} 65530 65530 65530".format(dnshostname)
self.runsubcmd("dns", "add", os.environ["SERVER"], self.zone,
srv_name, "SRV", srv_record, self.creds_string)
records = self.samdb.search(
base="DC=DomainDnsZones,{0}".format(self.samdb.get_default_basedn()),
scope=ldb.SCOPE_SUBTREE,
expression="(&(objectClass=dnsNode)(name={0}))".format(srv_name),
attrs=['dnsRecord'])
# should have 2 records here
self.assertEqual(len(records[0]['dnsRecord']), 2)
# cleanup record for dns host1
dnshostname1 = 'host1.{0}'.format(self.zone.lower())
self.runsubcmd("dns", "cleanup", os.environ["SERVER"],
dnshostname1, self.creds_string)
records = self.samdb.search(
base="DC=DomainDnsZones,{0}".format(self.samdb.get_default_basedn()),
scope=ldb.SCOPE_SUBTREE,
expression="(&(objectClass=dnsNode)(name={0}))".format(srv_name),
attrs=['dnsRecord', 'dNSTombstoned'])
# dnsRecord for host1 should be deleted
self.assertEqual(len(records[0]['dnsRecord']), 1)
# unpack data
dns_record_bin = records[0]['dnsRecord'][0]
dns_record_obj = ndr_unpack(dnsp.DnssrvRpcRecord, dns_record_bin)
# dnsRecord for host2 is still there and is the only one
dnshostname2 = 'host2.{0}'.format(self.zone.lower())
self.assertEqual(dns_record_obj.data.nameTarget, dnshostname2)
# assert that the record isn't spuriously tombstoned
self.assertTrue('dNSTombstoned' not in records[0] or
str(records[0]['dNSTombstoned']) == 'FALSE')
def test_dns_wildcards(self):
"""
Ensure that DNS wild card entries can be added deleted and queried
"""
num_failures = 0
failure_msgs = []
records = [("*.", "MISS", "A", "1.1.1.1"),
("*.SAMDOM", "MISS.SAMDOM", "A", "1.1.1.2")]
for (name, miss, dnstype, record) in records:
try:
result, out, err = self.runsubcmd("dns", "add",
os.environ["SERVER"],
self.zone, name,
dnstype, record,
self.creds_string)
self.assertCmdSuccess(
result,
out,
err,
("Failed to add record %s (%s) with type %s."
% (name, record, dnstype)))
result, out, err = self.runsubcmd("dns", "query",
os.environ["SERVER"],
self.zone, name,
dnstype,
self.creds_string)
self.assertCmdSuccess(
result,
out,
err,
("Failed to query record %s with qualifier %s."
% (record, dnstype)))
# dns tool does not perform dns wildcard search if the name
# does not match
result, out, err = self.runsubcmd("dns", "query",
os.environ["SERVER"],
self.zone, miss,
dnstype,
self.creds_string)
self.assertCmdFail(
result,
("Failed to query record %s with qualifier %s."
% (record, dnstype)))
result, out, err = self.runsubcmd("dns", "delete",
os.environ["SERVER"],
self.zone, name,
dnstype, record,
self.creds_string)
self.assertCmdSuccess(
result,
out,
err,
("Failed to remove record %s with type %s."
% (record, dnstype)))
except AssertionError as e:
num_failures = num_failures + 1
failure_msgs.append(e)
if num_failures > 0:
for msg in failure_msgs:
print(msg)
self.fail("Failed to accept valid commands. %d total failures."
"Errors above." % num_failures)
def test_serverinfo(self):
for v in ['w2k', 'dotnet', 'longhorn']:
result, out, err = self.runsubcmd("dns",
"serverinfo",
"--client-version", v,
os.environ["SERVER"],
self.creds_string)
self.assertCmdSuccess(result,
out,
err,
"Failed to print serverinfo with "
"client version %s" % v)
self.assertTrue(out != '')
def test_zoneinfo(self):
result, out, err = self.runsubcmd("dns",
"zoneinfo",
os.environ["SERVER"],
self.zone,
self.creds_string)
self.assertCmdSuccess(result,
out,
err,
"Failed to print zoneinfo")
self.assertTrue(out != '')
def test_zoneoptions_aging(self):
for options, vals, error in (
(['--aging=1'], {'fAging': 'TRUE'}, False),
(['--aging=0'], {'fAging': 'FALSE'}, False),
(['--aging=-1'], {'fAging': 'FALSE'}, True),
(['--aging=2'], {}, True),
(['--aging=2', '--norefreshinterval=1'], {}, True),
(['--aging=1', '--norefreshinterval=1'],
{'fAging': 'TRUE', 'dwNoRefreshInterval': '1'}, False),
(['--aging=1', '--norefreshinterval=0'],
{'fAging': 'TRUE', 'dwNoRefreshInterval': '0'}, False),
(['--aging=0', '--norefreshinterval=99', '--refreshinterval=99'],
{'fAging': 'FALSE',
'dwNoRefreshInterval': '99',
'dwRefreshInterval': '99'}, False),
(['--aging=0', '--norefreshinterval=-99', '--refreshinterval=99'],
{}, True),
(['--refreshinterval=9999999'], {}, True),
(['--norefreshinterval=9999999'], {}, True),
):
result, out, err = self.runsubcmd("dns",
"zoneoptions",
os.environ["SERVER"],
self.zone,
self.creds_string,
*options)
if error:
self.assertCmdFail(result, "zoneoptions should fail")
else:
self.assertCmdSuccess(result,
out,
err,
"zoneoptions shouldn't fail")
info_r, info_out, info_err = self.runsubcmd("dns",
"zoneinfo",
os.environ["SERVER"],
self.zone,
self.creds_string)
self.assertCmdSuccess(info_r,
info_out,
info_err,
"zoneinfo shouldn't fail after zoneoptions")
info = {k: v for k, v in re.findall(r'^\s*(\w+)\s*:\s*(\w+)\s*$',
info_out,
re.MULTILINE)}
for k, v in vals.items():
self.assertIn(k, info)
self.assertEqual(v, info[k])
def ldap_add_node_with_records(self, name, records):
dn = (f"DC={name},DC={self.zone},CN=MicrosoftDNS,DC=DomainDNSZones,"
f"{self.samdb.get_default_basedn()}")
dns_records = []
for r in records:
rec = dnsp.DnssrvRpcRecord()
rec.wType = r.get('wType', dnsp.DNS_TYPE_A)
rec.rank = dnsp.DNS_RANK_ZONE
rec.dwTtlSeconds = 900
rec.dwTimeStamp = r.get('dwTimeStamp', 0)
rec.data = r.get('data', '10.10.10.10')
dns_records.append(ndr_pack(rec))
msg = ldb.Message.from_dict(self.samdb,
{'dn': dn,
"objectClass": ["top", "dnsNode"],
'dnsRecord': dns_records
})
self.samdb.add(msg)
def get_timestamp_map(self):
re_wtypes = (dnsp.DNS_TYPE_A,
dnsp.DNS_TYPE_AAAA,
dnsp.DNS_TYPE_TXT)
t = time.time()
now = dsdb_dns.unix_to_dns_timestamp(int(t))
records = self.get_all_records(self.zone)
tsmap = {}
for k, recs in records.items():
m = []
tsmap[k] = m
for rec in recs:
r = ndr_unpack(dnsp.DnssrvRpcRecord, rec)
timestamp = r.dwTimeStamp
if abs(timestamp - now) < 3:
timestamp = 'nowish'
if r.wType in re_wtypes:
m.append(('R', timestamp))
else:
m.append(('-', timestamp))
return tsmap
def test_zoneoptions_mark_records(self):
self.maxDiff = 10000
# We need a number of records to work with, so we'll use part
# of our known good records list, using three different names
# to test the regex. All these records will be static.
for dnstype in self.good_records:
for record in self.good_records[dnstype][:2]:
self.runsubcmd("dns", "add",
os.environ["SERVER"],
self.zone, "frobitz",
dnstype, record,
self.creds_string)
self.runsubcmd("dns", "add",
os.environ["SERVER"],
self.zone, "weergly",
dnstype, record,
self.creds_string)
self.runsubcmd("dns", "add",
os.environ["SERVER"],
self.zone, "snizle",
dnstype, record,
self.creds_string)
# and we also want some that aren't static, and some mixed
# static/dynamic records.
# timestamps are in hours since 1601; now ~= 3.7 million
for ts in (0, 100, 10 ** 6, 10 ** 7):
name = f"ts-{ts}"
self.ldap_add_node_with_records(name, [{"dwTimeStamp": ts}])
recs = []
for ts in (0, 100, 10 ** 6, 10 ** 7):
addr = f'10.{(ts >> 16) & 255}.{(ts >> 8) & 255}.{ts & 255}'
recs.append({"dwTimeStamp": ts, "data": addr})
self.ldap_add_node_with_records("ts-multi", recs)
# get the state of ALL records.
# then we make assertions about the diffs, keeping track of
# the current state.
tsmap = self.get_timestamp_map()
for options, diff, output_substrings, error in (
# --mark-old-records-static
# --mark-records-static-regex
# --mark-records-dynamic-regex
(
['--mark-old-records-static=1971-13-04'],
{},
[],
"bad date"
),
(
# using --dry-run, should be no change, but output.
['--mark-old-records-static=1971-03-04', '--dry-run'],
{},
[
"would make 1/1 records static on ts-1000000.zone.",
"would make 1/1 records static on ts-100.zone.",
"would make 2/4 records static on ts-multi.zone.",
],
False
),
(
# timestamps < ~ 3.25 million are now static
['--mark-old-records-static=1971-03-04'],
{
'ts-100': [('R', 0)],
'ts-1000000': [('R', 0)],
'ts-multi': [('R', 0), ('R', 0), ('R', 0), ('R', 10000000)]
},
[
"made 1/1 records static on ts-1000000.zone.",
"made 1/1 records static on ts-100.zone.",
"made 2/4 records static on ts-multi.zone.",
],
False
),
(
# no change, old records already static
['--mark-old-records-static=1972-03-04'],
{},
[],
False
),
(
# no change, samba-tool added records already static
['--mark-records-static-regex=sniz'],
{},
[],
False
),
(
# snizle has 2 A, 2 AAAA, 10 fancy, and 2 TXT records, in
# that order.
# the A, AAAA, and TXT recrods should be dynamic
['--mark-records-dynamic-regex=sniz'],
{'snizle': [('R', 'nowish'),
('R', 'nowish'),
('R', 'nowish'),
('R', 'nowish'),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('R', 'nowish'),
('R', 'nowish')]
},
['made 6/16 records dynamic on snizle.zone.'],
False
),
(
# This regex should catch snizle, weergly, and ts-*
# but we're doing dry-run so no change
['--mark-records-dynamic-regex=[sw]', '-n'],
{},
['would make 3/4 records dynamic on ts-multi.zone.',
'would make 1/1 records dynamic on ts-0.zone.',
'would make 1/1 records dynamic on ts-1000000.zone.',
'would make 6/16 records dynamic on weergly.zone.',
'would make 1/1 records dynamic on ts-100.zone.'
],
False
),
(
# This regex should catch snizle and frobitz
# but snizle has already been changed.
['--mark-records-dynamic-regex=z'],
{'frobitz': [('R', 'nowish'),
('R', 'nowish'),
('R', 'nowish'),
('R', 'nowish'),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('R', 'nowish'),
('R', 'nowish')]
},
['made 6/16 records dynamic on frobitz.zone.'],
False
),
(
# This regex should catch snizle, frobitz, and
# ts-multi. Note that the 1e7 ts-multi record is
# alreay dynamic and doesn't change.
['--mark-records-dynamic-regex=[i]'],
{'ts-multi': [('R', 'nowish'),
('R', 'nowish'),
('R', 'nowish'),
('R', 10000000)]
},
['made 3/4 records dynamic on ts-multi.zone.'],
False
),
(
# matches no records
['--mark-records-dynamic-regex=^aloooooo[qw]+'],
{},
[],
False
),
(
# This should be an error, as only one --mark-*
# argument is allowed at a time
['--mark-records-dynamic-regex=.',
'--mark-records-static-regex=.',
],
{},
[],
True
),
(
# This should also be an error
['--mark-old-records-static=1997-07-07',
'--mark-records-static-regex=.',
],
{},
[],
True
),
(
# This should not be an error. --aging and refresh
# options can be mixed with --mark ones.
['--mark-old-records-static=1997-07-07',
'--aging=0',
],
{},
['Set Aging to 0'],
False
),
(
# This regex should catch weergly, but all the
# records are already static,
['--mark-records-static-regex=wee'],
{},
[],
False
),
(
# Make frobitz static again.
['--mark-records-static-regex=obi'],
{'frobitz': [('R', 0),
('R', 0),
('R', 0),
('R', 0),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('R', 0),
('R', 0)]
},
['made 6/16 records static on frobitz.zone.'],
False
),
(
# would make almost everything static, but --dry-run
['--mark-old-records-static=2222-03-04', '--dry-run'],
{},
[
'would make 6/16 records static on snizle.zone.',
'would make 3/4 records static on ts-multi.zone.'
],
False
),
(
# make everything static
['--mark-records-static-regex=.'],
{'snizle': [('R', 0),
('R', 0),
('R', 0),
('R', 0),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('R', 0),
('R', 0)],
'ts-10000000': [('R', 0)],
'ts-multi': [('R', 0), ('R', 0), ('R', 0), ('R', 0)]
},
[
'made 4/4 records static on ts-multi.zone.',
'made 1/1 records static on ts-10000000.zone.',
'made 6/16 records static on snizle.zone.',
],
False
),
(
# make everything dynamic that can be
['--mark-records-dynamic-regex=.'],
{'frobitz': [('R', 'nowish'),
('R', 'nowish'),
('R', 'nowish'),
('R', 'nowish'),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('R', 'nowish'),
('R', 'nowish')],
'snizle': [('R', 'nowish'),
('R', 'nowish'),
('R', 'nowish'),
('R', 'nowish'),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('R', 'nowish'),
('R', 'nowish')],
'ts-0': [('R', 'nowish')],
'ts-100': [('R', 'nowish')],
'ts-1000000': [('R', 'nowish')],
'ts-10000000': [('R', 'nowish')],
'ts-multi': [('R', 'nowish'),
('R', 'nowish'),
('R', 'nowish'),
('R', 'nowish')],
'weergly': [('R', 'nowish'),
('R', 'nowish'),
('R', 'nowish'),
('R', 'nowish'),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('-', 0),
('R', 'nowish'),
('R', 'nowish')]
},
[
'made 4/4 records dynamic on ts-multi.zone.',
'made 6/16 records dynamic on snizle.zone.',
'made 1/1 records dynamic on ts-0.zone.',
'made 1/1 records dynamic on ts-1000000.zone.',
'made 1/1 records dynamic on ts-10000000.zone.',
'made 1/1 records dynamic on ts-100.zone.',
'made 6/16 records dynamic on frobitz.zone.',
'made 6/16 records dynamic on weergly.zone.',
],
False
),
):
result, out, err = self.runsubcmd("dns",
"zoneoptions",
os.environ["SERVER"],
self.zone,
self.creds_string,
*options)
if error:
self.assertCmdFail(result, f"zoneoptions should fail ({error})")
else:
self.assertCmdSuccess(result,
out,
err,
"zoneoptions shouldn't fail")
new_tsmap = self.get_timestamp_map()
# same keys, always
self.assertEqual(sorted(new_tsmap), sorted(tsmap))
changes = {}
for k in tsmap:
if tsmap[k] != new_tsmap[k]:
changes[k] = new_tsmap[k]
self.assertEqual(diff, changes)
for s in output_substrings:
self.assertIn(s, out)
tsmap = new_tsmap