mirror of
https://github.com/samba-team/samba.git
synced 2025-01-13 13:18:06 +03:00
de2b775e9a
With Windows, when aging is off, the record timestamps are updated anyway, but the timestamp change is not replicated. We are not going to do it like that. With aging off, our records will keep their first timestamp. Signed-off-by: Douglas Bagnall <douglas.bagnall@catalyst.net.nz> Reviewed-by: Andrew Bartlett <abartlet@samba.org>
2787 lines
109 KiB
Python
2787 lines
109 KiB
Python
# Unix SMB/CIFS implementation.
|
|
# Copyright (C) Kai Blin <kai@samba.org> 2011
|
|
# Copyright (C) Catalyst.NET 2021
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
#
|
|
|
|
import sys
|
|
from samba import dsdb
|
|
from samba import dsdb_dns
|
|
from samba.ndr import ndr_unpack, ndr_pack
|
|
from samba.samdb import SamDB
|
|
from samba.auth import system_session
|
|
import ldb
|
|
from samba import credentials
|
|
from samba.dcerpc import dns, dnsp, dnsserver
|
|
from samba.dnsserver import TXTRecord, ARecord
|
|
from samba.dnsserver import recbuf_from_string, ipv6_normalise
|
|
from samba.tests.subunitrun import SubunitOptions, TestProgram
|
|
from samba import werror, WERRORError
|
|
from samba.tests.dns_base import DNSTest
|
|
import samba.getopt as options
|
|
import optparse
|
|
import time
|
|
from samba.colour import c_RED, c_GREEN, c_DARK_YELLOW
|
|
|
|
parser = optparse.OptionParser(
|
|
"dns_aging.py <server name> <server ip> [options]")
|
|
sambaopts = options.SambaOptions(parser)
|
|
parser.add_option_group(sambaopts)
|
|
|
|
|
|
# use command line creds if available
|
|
credopts = options.CredentialsOptions(parser)
|
|
parser.add_option_group(credopts)
|
|
subunitopts = SubunitOptions(parser)
|
|
parser.add_option_group(subunitopts)
|
|
|
|
opts, args = parser.parse_args()
|
|
if len(args) < 2:
|
|
parser.print_usage()
|
|
sys.exit(1)
|
|
|
|
LP = sambaopts.get_loadparm()
|
|
CREDS = credopts.get_credentials(LP)
|
|
SERVER_NAME = args[0]
|
|
SERVER_IP = args[1]
|
|
CREDS.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE)
|
|
|
|
DOMAIN = CREDS.get_realm().lower()
|
|
|
|
# Unix time start, in DNS timestamp (24 * 365.25 * 369)
|
|
# These are ballpark extremes for the timestamp.
|
|
DNS_TIMESTAMP_1970 = 3234654
|
|
DNS_TIMESTAMP_2101 = 4383000
|
|
DNS_TIMESTAMP_1981 = 3333333 # a middling timestamp
|
|
|
|
IPv4_ADDR = "127.0.0.33"
|
|
IPv6_ADDR = "::1"
|
|
IPv4_ADDR_2 = "127.0.0.66"
|
|
IPv6_ADDR_2 = "1::1"
|
|
|
|
|
|
def get_samdb():
|
|
return SamDB(url=f"ldap://{SERVER_IP}",
|
|
lp=LP,
|
|
session_info=system_session(),
|
|
credentials=CREDS)
|
|
|
|
|
|
def get_file_samdb():
|
|
# For Samba only direct file access, needed for the tombstoning functions.
|
|
# (For Windows, we instruct it to tombstone over RPC).
|
|
return SamDB(url=LP.samdb_url(),
|
|
lp=LP,
|
|
session_info=system_session(),
|
|
credentials=CREDS)
|
|
|
|
|
|
def get_rpc():
|
|
return dnsserver.dnsserver(f"ncacn_ip_tcp:{SERVER_IP}[sign]", LP, CREDS)
|
|
|
|
|
|
def create_zone(name, rpc=None, aging=True):
|
|
if rpc is None:
|
|
rpc = get_rpc()
|
|
z = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
|
|
z.pszZoneName = name
|
|
z.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
|
|
z.fAging = int(bool(aging))
|
|
z.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
|
|
z.fDsIntegrated = 1
|
|
z.fLoadExisting = 1
|
|
z.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE
|
|
rpc.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
|
|
0,
|
|
SERVER_IP,
|
|
None,
|
|
0,
|
|
'ZoneCreate',
|
|
dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
|
|
z)
|
|
|
|
|
|
def delete_zone(name, rpc=None):
|
|
if rpc is None:
|
|
rpc = get_rpc()
|
|
rpc.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
|
|
0,
|
|
SERVER_IP,
|
|
name,
|
|
0,
|
|
'DeleteZoneFromDs',
|
|
dnsserver.DNSSRV_TYPEID_NULL,
|
|
None)
|
|
|
|
|
|
def txt_s_list(txt):
|
|
"""Construct a txt record string list, which is a fiddly matter."""
|
|
if isinstance(txt, str):
|
|
txt = [txt]
|
|
s_list = dnsp.string_list()
|
|
s_list.count = len(txt)
|
|
s_list.str = txt
|
|
return s_list
|
|
|
|
|
|
def make_txt_record(txt):
|
|
r = dns.txt_record()
|
|
r.txt = txt_s_list(txt)
|
|
return r
|
|
|
|
|
|
def copy_rec(rec):
|
|
copy = dnsserver.DNS_RPC_RECORD()
|
|
copy.wType = rec.wType
|
|
copy.dwFlags = rec.dwFlags
|
|
copy.dwSerial = rec.dwSerial
|
|
copy.dwTtlSeconds = rec.dwTtlSeconds
|
|
copy.data = rec.data
|
|
copy.dwTimeStamp = rec.dwTimeStamp
|
|
return copy
|
|
|
|
|
|
def guess_wtype(data):
|
|
if isinstance(data, list):
|
|
data = make_txt_record(data)
|
|
return (data, dnsp.DNS_TYPE_TXT)
|
|
if ":" in data:
|
|
return (data, dnsp.DNS_TYPE_AAAA)
|
|
return (data, dnsp.DNS_TYPE_A)
|
|
|
|
|
|
class TestDNSAging(DNSTest):
|
|
"""Probe DNS aging and scavenging, using LDAP and RPC to set and test
|
|
the timestamps behind DNS's back."""
|
|
server = SERVER_NAME
|
|
server_ip = SERVER_IP
|
|
creds = CREDS
|
|
|
|
def setUp(self):
|
|
super().setUp()
|
|
self.rpc_conn = get_rpc()
|
|
self.samdb = get_samdb()
|
|
|
|
# We always have a zone of our own named after the test function.
|
|
self.zone = self.id().rsplit('.', 1)[1]
|
|
self.addCleanup(delete_zone, self.zone, self.rpc_conn)
|
|
try:
|
|
create_zone(self.zone, self.rpc_conn)
|
|
except WERRORError as e:
|
|
if e.args[0] != werror.WERR_DNS_ERROR_ZONE_ALREADY_EXISTS:
|
|
raise
|
|
print(f"zone {self.zone} already exists")
|
|
|
|
# Though we set this in create_zone(), that doesn't work on
|
|
# Windows, so we repeat again here.
|
|
self.set_zone_int_params(AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
|
|
|
|
self.zone_dn = (f"DC={self.zone},CN=MicrosoftDNS,DC=DomainDNSZones,"
|
|
f"{self.samdb.get_default_basedn()}")
|
|
|
|
def set_zone_int_params(self, zone=None, **kwargs):
|
|
"""Keyword arguments set parameters on the zone. e.g.:
|
|
|
|
self.set_zone_int_params(Aging=1,
|
|
RefreshInterval=222)
|
|
|
|
See [MS-DNSP] 3.1.1.2.1 "DNS Zone Integer Properties" for names.
|
|
"""
|
|
if zone is None:
|
|
zone = self.zone
|
|
for key, val in kwargs.items():
|
|
name_param = dnsserver.DNS_RPC_NAME_AND_PARAM()
|
|
name_param.dwParam = val
|
|
name_param.pszNodeName = key
|
|
try:
|
|
self.rpc_conn.DnssrvOperation2(
|
|
dnsserver.DNS_CLIENT_VERSION_LONGHORN,
|
|
0,
|
|
SERVER_IP,
|
|
zone,
|
|
0,
|
|
'ResetDwordProperty',
|
|
dnsserver.DNSSRV_TYPEID_NAME_AND_PARAM,
|
|
name_param)
|
|
except WERRORError as e:
|
|
self.fail(str(e))
|
|
|
|
def rpc_replace(self, name, old=None, new=None):
|
|
"""Replace a DNS_RPC_RECORD or DNS_RPC_RECORD_BUF"""
|
|
# wrap our recs, if necessary
|
|
if isinstance(new, dnsserver.DNS_RPC_RECORD):
|
|
rec = new
|
|
new = dnsserver.DNS_RPC_RECORD_BUF()
|
|
new.rec = rec
|
|
|
|
if isinstance(old, dnsserver.DNS_RPC_RECORD):
|
|
rec = old
|
|
old = dnsserver.DNS_RPC_RECORD_BUF()
|
|
old.rec = rec
|
|
|
|
try:
|
|
self.rpc_conn.DnssrvUpdateRecord2(
|
|
dnsserver.DNS_CLIENT_VERSION_LONGHORN,
|
|
0,
|
|
SERVER_IP,
|
|
self.zone,
|
|
name,
|
|
new,
|
|
old)
|
|
except WERRORError as e:
|
|
self.fail(f"could not replace record ({e})")
|
|
|
|
def get_unique_txt_record(self, name, txt):
|
|
"""Get the TXT record on Name with value txt, asserting that there is
|
|
only one."""
|
|
if isinstance(txt, str):
|
|
txt = [txt]
|
|
recs = self.ldap_get_records(name)
|
|
|
|
match = None
|
|
for r in recs:
|
|
if r.wType != dnsp.DNS_TYPE_TXT:
|
|
continue
|
|
txt2 = [x for x in r.data.str]
|
|
if txt2 == txt:
|
|
self.assertIsNone(match)
|
|
match = r
|
|
return match
|
|
|
|
def get_unique_ip_record(self, name, addr, wtype=None):
|
|
"""Get an A or AAAA record on name with the matching data."""
|
|
if wtype is None:
|
|
addr, wtype = guess_wtype(addr)
|
|
|
|
recs = self.ldap_get_records(name)
|
|
|
|
# We need to use the internal dns_record_match because not all
|
|
# forms always match on strings (e.g. IPv6)
|
|
rec = dnsp.DnssrvRpcRecord()
|
|
rec.wType = wtype
|
|
rec.data = addr
|
|
|
|
match = None
|
|
for r in recs:
|
|
if dsdb_dns.records_match(r, rec):
|
|
self.assertIsNone(match)
|
|
match = r
|
|
return match
|
|
|
|
def dns_query(self, name, qtype=dns.DNS_QTYPE_ALL):
|
|
"""make a query, which might help Windows notice LDAP changes"""
|
|
p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
|
|
fullname = "%s.%s" % (name, self.zone)
|
|
q = self.make_name_question(fullname, qtype, dns.DNS_QCLASS_IN)
|
|
self.finish_name_packet(p, [q])
|
|
r, rp = self.dns_transaction_udp(p, host=SERVER_IP)
|
|
|
|
return r
|
|
|
|
def dns_update_non_text(self, name,
|
|
data,
|
|
wtype=None,
|
|
qclass=dns.DNS_QCLASS_IN):
|
|
if wtype is None:
|
|
data, wtype = guess_wtype(data)
|
|
|
|
if qclass == dns.DNS_QCLASS_IN:
|
|
ttl = 123
|
|
else:
|
|
ttl = 0
|
|
|
|
fullname = "%s.%s" % (name, self.zone)
|
|
p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
|
|
u = self.make_name_question(self.zone,
|
|
dns.DNS_QTYPE_SOA,
|
|
dns.DNS_QCLASS_IN)
|
|
self.finish_name_packet(p, [u])
|
|
|
|
r = dns.res_rec()
|
|
r.name = fullname
|
|
r.rr_type = wtype
|
|
r.rr_class = qclass
|
|
r.ttl = ttl
|
|
if data is not None:
|
|
r.length = 0xffff
|
|
r.rdata = data
|
|
else:
|
|
r.length = 0
|
|
|
|
p.nscount = 1
|
|
p.nsrecs = [r]
|
|
|
|
(code, response) = self.dns_transaction_udp(p, host=SERVER_IP)
|
|
self.assert_dns_rcode_equals(code, dns.DNS_RCODE_OK)
|
|
return response
|
|
|
|
def dns_delete(self, name, data, wtype=None):
|
|
return self.dns_update_non_text(name,
|
|
data,
|
|
wtype,
|
|
qclass=dns.DNS_QCLASS_NONE)
|
|
|
|
def dns_delete_type(self, name, wtype):
|
|
return self.dns_update_non_text(name,
|
|
None,
|
|
wtype,
|
|
qclass=dns.DNS_QCLASS_ANY)
|
|
|
|
def dns_update_record(self, name, txt, ttl=900):
|
|
if isinstance(txt, str):
|
|
txt = [txt]
|
|
p = self.make_txt_update(name, txt, self.zone, ttl=ttl)
|
|
(code, response) = self.dns_transaction_udp(p, host=SERVER_IP)
|
|
if code.operation & dns.DNS_RCODE == dns.DNS_RCODE_REFUSED:
|
|
# sometimes you might forget this
|
|
print("\n\ngot DNS_RCODE_REFUSED\n")
|
|
print("Are you running this in the fl2003 environment?\n")
|
|
print("try `SELFTEST_TESTENV='fl2003dc:local' make testenv`\n\n")
|
|
|
|
self.assert_dns_rcode_equals(code, dns.DNS_RCODE_OK)
|
|
return self.get_unique_txt_record(name, txt)
|
|
|
|
def rpc_update_record(self, name, txt, **kwargs):
|
|
"""Add the record that self.dns_update_record() would add, via the
|
|
dnsserver RPC pipe.
|
|
|
|
As with DNS update, if the record already exists, we replace it.
|
|
"""
|
|
if isinstance(txt, str):
|
|
txt = [txt]
|
|
|
|
old = TXTRecord(txt)
|
|
rec = TXTRecord(txt)
|
|
for k, v in kwargs.items():
|
|
setattr(rec, k, v)
|
|
|
|
try:
|
|
self.rpc_replace(name, old, rec)
|
|
except AssertionError as e:
|
|
# we have caught and wrapped the WERRor inside
|
|
if 'WERR_DNS_ERROR_RECORD_DOES_NOT_EXIST' not in str(e):
|
|
raise
|
|
self.rpc_replace(name, None, rec)
|
|
|
|
return self.get_unique_txt_record(name, txt)
|
|
|
|
def rpc_delete_txt(self, name, txt):
|
|
if isinstance(txt, str):
|
|
txt = [txt]
|
|
old = TXTRecord(txt)
|
|
self.rpc_replace(name, old, None)
|
|
|
|
def get_one_node(self, name):
|
|
expr = f"(&(objectClass=dnsNode)(name={name}))"
|
|
nodes = self.samdb.search(base=self.zone_dn,
|
|
scope=ldb.SCOPE_SUBTREE,
|
|
expression=expr,
|
|
attrs=["dnsRecord", "dNSTombstoned", "name"])
|
|
|
|
if len(nodes) > 1:
|
|
self.fail(
|
|
f"expected 0 or 1 dnsNodes for {name}, found {len(nodes)}")
|
|
|
|
if len(nodes) == 0:
|
|
return None
|
|
return nodes[0]
|
|
|
|
def ldap_get_records(self, name):
|
|
node = self.get_one_node(name)
|
|
if node is None:
|
|
return []
|
|
|
|
records = node.get('dnsRecord')
|
|
return [ndr_unpack(dnsp.DnssrvRpcRecord, r) for r in records]
|
|
|
|
def ldap_get_non_tombstoned_records(self, name):
|
|
all_records = self.ldap_get_records(name)
|
|
records = []
|
|
for r in all_records:
|
|
if r.wType != dnsp.DNS_TYPE_TOMBSTONE:
|
|
records.append(r)
|
|
return records
|
|
|
|
def assert_tombstoned(self, name, tombstoned=True, timestamp=None):
|
|
# If run with tombstoned=False, assert it isn't tombstoned
|
|
# (and has no traces of tombstone). Otherwise assert it has
|
|
# all the necessary bits.
|
|
#
|
|
# with timestamp=<non-zero number of hours>, we assert that
|
|
# the nttime timestamp is about that time.
|
|
#
|
|
# with timestamp=None, we assert it is within a century or so.
|
|
#
|
|
# with timestamp=False (or 0), we don't assert on it.
|
|
|
|
node = self.get_one_node(name)
|
|
if node is None:
|
|
self.fail(f"no node named {name}")
|
|
|
|
dnsts = node.get("dNSTombstoned")
|
|
if dnsts is None:
|
|
is_tombstoned = False
|
|
else:
|
|
self.assertEqual(len(dnsts), 1)
|
|
if dnsts[0] == b'TRUE':
|
|
is_tombstoned = True
|
|
else:
|
|
is_tombstoned = False
|
|
|
|
if tombstoned != is_tombstoned:
|
|
if is_tombstoned:
|
|
self.fail(f"{name} is tombstoned")
|
|
else:
|
|
self.fail(f"{name} is not tombstoned")
|
|
|
|
recs = self.ldap_get_records(name)
|
|
if is_tombstoned:
|
|
self.assertEqual(len(recs), 1)
|
|
self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TOMBSTONE)
|
|
if timestamp is None:
|
|
self.assert_nttime_in_hour_range(recs[0].data)
|
|
elif timestamp:
|
|
self.assert_nttime_in_hour_range(recs[0].data,
|
|
timestamp - 3,
|
|
timestamp + 3)
|
|
|
|
else:
|
|
for r in recs:
|
|
self.assertNotEqual(recs[0].wType, dnsp.DNS_TYPE_TOMBSTONE)
|
|
|
|
def ldap_replace_records(self, name, records):
|
|
# We use raw ldap to avoid the "helpfulness" of dsdb_dns.replace()
|
|
|
|
dn = f'DC={name},{self.zone_dn}'
|
|
|
|
msg = ldb.Message.from_dict(self.samdb,
|
|
{'dn': dn,
|
|
'dnsRecord': [ndr_pack(r) for r in records]
|
|
},
|
|
ldb.FLAG_MOD_REPLACE)
|
|
|
|
try:
|
|
self.samdb.modify(msg)
|
|
except ldb.LdbError as e:
|
|
if 'LDAP_NO_SUCH_OBJECT' not in e.args[1]:
|
|
raise
|
|
# We need to do an add
|
|
msg["objectClass"] = ["top", "dnsNode"]
|
|
msg["dnsRecord"].set_flags(ldb.FLAG_MOD_ADD)
|
|
self.samdb.add(msg)
|
|
|
|
def ldap_update_core(self, name, wtype, data, **kwargs):
|
|
"""This one is not TXT specific."""
|
|
records = self.ldap_get_records(name)
|
|
|
|
# default values
|
|
rec = dnsp.DnssrvRpcRecord()
|
|
rec.wType = wtype
|
|
rec.rank = dnsp.DNS_RANK_ZONE
|
|
rec.dwTtlSeconds = 900
|
|
rec.dwSerial = 110
|
|
rec.dwTimeStamp = 0
|
|
rec.data = data
|
|
|
|
# override defaults, as required
|
|
for k, v in kwargs.items():
|
|
setattr(rec, k, v)
|
|
|
|
for i, r in enumerate(records[:]):
|
|
if dsdb_dns.records_match(r, rec):
|
|
records[i] = rec
|
|
break
|
|
else: # record not found
|
|
records.append(rec)
|
|
|
|
self.ldap_replace_records(name, records)
|
|
return rec
|
|
|
|
def ldap_update_record(self, name, txt, **kwargs):
|
|
"""Add the record that self.dns_update_record() would add, via ldap,
|
|
thus allowing us to set additional dnsRecord features like
|
|
dwTimestamp.
|
|
"""
|
|
rec = self.ldap_update_core(name,
|
|
dnsp.DNS_TYPE_TXT,
|
|
txt_s_list(txt),
|
|
**kwargs)
|
|
|
|
recs = self.ldap_get_records(name)
|
|
match = None
|
|
for r in recs:
|
|
if r.wType != rec.wType:
|
|
continue
|
|
if r.data.str == rec.data.str:
|
|
self.assertIsNone(match, f"duplicate records for {name}")
|
|
match = r
|
|
self.assertEqual(match.rank, rec.rank & 255)
|
|
self.assertEqual(match.dwTtlSeconds, rec.dwTtlSeconds)
|
|
self.assert_timestamps_equal(match.dwTimeStamp, rec.dwTimeStamp)
|
|
return match
|
|
|
|
def ldap_delete_record(self, name, data, wtype=dnsp.DNS_TYPE_TXT):
|
|
rec = dnsp.DnssrvRpcRecord()
|
|
if wtype == dnsp.DNS_TYPE_TXT:
|
|
data = txt_s_list(data)
|
|
|
|
rec.wType = wtype
|
|
rec.data = data
|
|
records = self.ldap_get_records(name)
|
|
for i, r in enumerate(records[:]):
|
|
if dsdb_dns.records_match(r, rec):
|
|
del records[i]
|
|
break
|
|
else:
|
|
self.fail(f"record {data} not found")
|
|
|
|
self.ldap_replace_records(name, records)
|
|
|
|
def add_ip_record(self, name, addr, wtype=None, **kwargs):
|
|
if wtype is None:
|
|
addr, wtype = guess_wtype(addr)
|
|
rec = self.ldap_update_core(name,
|
|
wtype,
|
|
addr,
|
|
**kwargs)
|
|
|
|
recs = self.ldap_get_records(name)
|
|
match = None
|
|
for r in recs:
|
|
if dsdb_dns.records_match(r, rec):
|
|
self.assertIsNone(match, f"duplicate records for {name}")
|
|
match = r
|
|
self.assertEqual(match.rank, rec.rank & 255)
|
|
self.assertEqual(match.dwTtlSeconds, rec.dwTtlSeconds)
|
|
self.assert_timestamps_equal(match.dwTimeStamp, rec.dwTimeStamp)
|
|
return match
|
|
|
|
def ldap_modify_timestamps(self, name, delta):
|
|
records = self.ldap_get_records(name)
|
|
for rec in records:
|
|
rec.dwTimeStamp += delta
|
|
self.ldap_replace_records(name, records)
|
|
|
|
def get_rpc_records(self, name, dns_type=None):
|
|
if dns_type is None:
|
|
dns_type = dnsp.DNS_TYPE_ALL
|
|
select_flags = dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA
|
|
buflen, res = self.rpc_conn.DnssrvEnumRecords2(
|
|
dnsserver.DNS_CLIENT_VERSION_LONGHORN,
|
|
0,
|
|
SERVER_IP,
|
|
self.zone,
|
|
name,
|
|
None,
|
|
dns_type,
|
|
select_flags,
|
|
None,
|
|
None)
|
|
recs = []
|
|
if not res or res.count == 0:
|
|
return []
|
|
for rec in res.rec:
|
|
recs.extend(rec.records)
|
|
return recs
|
|
|
|
def dns_tombstone(self, name,
|
|
epoch_hours=DNS_TIMESTAMP_1981,
|
|
epoch_nttime=None):
|
|
dn = f'DC={name},{self.zone_dn}'
|
|
r = dnsp.DnssrvRpcRecord()
|
|
r.wType = dnsp.DNS_TYPE_TOMBSTONE
|
|
# r.dwTimeStamp is a 32 bit value in hours, and r.data is an
|
|
# NTTIME (100 nanosecond intervals), both in the 1601 epoch. A
|
|
# tombstome will have both, but expiration calculations use
|
|
# the r.data NTTIME EntombedTime timestamp (see [MS-DNSP]).
|
|
r.dwTimeStamp = epoch_hours
|
|
if epoch_nttime is None:
|
|
r.data = epoch_hours * 3600 * 10 * 1000 * 1000
|
|
else:
|
|
r.data = epoch_nttime
|
|
|
|
msg = ldb.Message.from_dict(self.samdb,
|
|
{'dn': dn,
|
|
'dnsRecord': [ndr_pack(r)],
|
|
'dnsTombstoned': 'TRUE'
|
|
},
|
|
ldb.FLAG_MOD_REPLACE)
|
|
try:
|
|
self.samdb.modify(msg)
|
|
except ldb.LdbError as e:
|
|
if 'LDAP_NO_SUCH_OBJECT' not in e.args[1]:
|
|
raise
|
|
# We need to do an add
|
|
msg["objectClass"] = ["top", "dnsNode"]
|
|
self.samdb.add(msg)
|
|
|
|
def set_aging(self, enable=False):
|
|
self.set_zone_int_params(Aging=int(bool(enable)))
|
|
|
|
def assert_timestamp_in_ballpark(self, rec):
|
|
self.assertGreater(rec.dwTimeStamp, DNS_TIMESTAMP_1970)
|
|
self.assertLess(rec.dwTimeStamp, DNS_TIMESTAMP_2101)
|
|
|
|
def assert_nttime_in_hour_range(self, t,
|
|
hour_min=DNS_TIMESTAMP_1970,
|
|
hour_max=DNS_TIMESTAMP_2101):
|
|
t //= int(3600 * 1e7)
|
|
self.assertGreater(t, hour_min)
|
|
self.assertLess(t, hour_max)
|
|
|
|
def assert_soon_after(self, timestamp, reference):
|
|
"""Assert that a timestamp is the same or very slightly higher than a
|
|
reference timestamp.
|
|
|
|
Typically we expect the timestamps to be identical, unless an
|
|
hour has clicked over since the reference was taken. However
|
|
we allow one more hour in case it happens during a daylight
|
|
savings transition or something.
|
|
"""
|
|
if hasattr(timestamp, 'dwTimeStamp'):
|
|
timestamp = timestamp.dwTimeStamp
|
|
if hasattr(reference, 'dwTimeStamp'):
|
|
reference = reference.dwTimeStamp
|
|
|
|
diff = timestamp - reference
|
|
days = abs(diff / 24.0)
|
|
|
|
if diff < 0:
|
|
msg = f"timestamp is {days} days ({abs(diff)} hours) before reference"
|
|
elif diff > 2:
|
|
msg = f"timestamp is {days} days ({diff} hours) after reference"
|
|
else:
|
|
return
|
|
raise AssertionError(msg)
|
|
|
|
def assert_timestamps_equal(self, ts1, ts2):
|
|
"""Just like assertEqual(), but tells us the difference, not the
|
|
absolute values. e.g:
|
|
|
|
self.assertEqual(a, b)
|
|
AssertionError: 3685491 != 3685371
|
|
|
|
self.assert_timestamps_equal(a, b)
|
|
AssertionError: -120 (first is 5.0 days earlier than second)
|
|
|
|
Also, we turn a record into a timestamp if we need
|
|
"""
|
|
if hasattr(ts1, 'dwTimeStamp'):
|
|
ts1 = ts1.dwTimeStamp
|
|
if hasattr(ts2, 'dwTimeStamp'):
|
|
ts2 = ts2.dwTimeStamp
|
|
|
|
if ts1 == ts2:
|
|
return
|
|
|
|
diff = ts1 - ts2
|
|
days = abs(diff / 24.0)
|
|
if ts1 == 0 or ts2 == 0:
|
|
# when comparing to zero we don't want the number of days.
|
|
msg = f"timestamp {ts1} != {ts2}"
|
|
elif diff > 0:
|
|
msg = f"{ts1} is {days} days ({diff} hours) after {ts2}"
|
|
else:
|
|
msg = f"{ts1} is {days} days ({abs(diff)} hours) before {ts2}"
|
|
|
|
raise AssertionError(msg)
|
|
|
|
def test_update_timestamps_aging_off_then_on(self):
|
|
# we will add a record with aging off
|
|
# it will have the current timestamp
|
|
self.set_aging(False)
|
|
name = 'timestamp-now'
|
|
name2 = 'timestamp-eightdays'
|
|
|
|
rec = self.dns_update_record(name, [name])
|
|
start_time = rec.dwTimeStamp
|
|
self.assert_timestamp_in_ballpark(rec)
|
|
# alter the timestamp -8 days using RPC
|
|
# with aging turned off, we expect no change
|
|
# when aging is on, we expect change
|
|
eight_days_ago = start_time - 8 * 24
|
|
rec = self.ldap_update_record(name2, [name2],
|
|
dwTimeStamp=eight_days_ago)
|
|
|
|
self.assert_timestamps_equal(rec.dwTimeStamp, eight_days_ago)
|
|
|
|
# if aging was on, this would change
|
|
rec = self.dns_update_record(name2, [name2])
|
|
self.assert_timestamps_equal(rec.dwTimeStamp, eight_days_ago)
|
|
|
|
self.set_aging(True)
|
|
rec = self.dns_update_record(name2, [name2])
|
|
self.assertGreaterEqual(rec.dwTimeStamp, start_time)
|
|
|
|
def test_rpc_update_timestamps(self):
|
|
# RPC always sets timestamps to zero on Windows.
|
|
self.set_aging(False)
|
|
name = 'timestamp-now'
|
|
|
|
rec = self.dns_update_record(name, [name])
|
|
start_time = rec.dwTimeStamp
|
|
self.assert_timestamp_in_ballpark(rec)
|
|
# attempt to alter the timestamp to something close by.
|
|
eight_days_ago = start_time - 8 * 24
|
|
rec = self.rpc_update_record(name, [name],
|
|
dwTimeStamp=eight_days_ago)
|
|
self.assertEqual(rec.dwTimeStamp, 0)
|
|
|
|
# try again, with aging on
|
|
self.set_aging(True)
|
|
rec = self.rpc_update_record(name, [name],
|
|
dwTimeStamp=eight_days_ago)
|
|
self.assertEqual(rec.dwTimeStamp, 0)
|
|
|
|
# now that the record is static, a dns update won't change it
|
|
rec = self.dns_update_record(name, [name])
|
|
self.assertEqual(rec.dwTimeStamp, 0)
|
|
|
|
# but another record on the same node will behave normally
|
|
# i.e. the node is not static, the record is.
|
|
name2 = 'timestamp-eightdays'
|
|
rec = self.dns_update_record(name2, [name2])
|
|
self.assert_soon_after(rec.dwTimeStamp,
|
|
start_time)
|
|
|
|
def get_txt_timestamps(self, name, *txts):
|
|
records = self.ldap_get_records(name)
|
|
|
|
ret = []
|
|
for t in txts:
|
|
for r in records:
|
|
t2 = [x for x in r.data.str]
|
|
if t == t2:
|
|
ret.append(r.dwTimeStamp)
|
|
return ret
|
|
|
|
def test_update_aging_disabled_2(self):
|
|
# With aging disabled, Windows updates the timestamps of all
|
|
# records when one is updated.
|
|
name = 'test'
|
|
txt1 = ['test txt']
|
|
txt2 = ['test', 'txt2']
|
|
txt3 = ['test', 'txt3']
|
|
|
|
self.set_aging(False)
|
|
|
|
current_time = self.dns_update_record(name, txt1).dwTimeStamp
|
|
|
|
six_days_ago = current_time - 6 * 24
|
|
eight_days_ago = current_time - 8 * 24
|
|
fifteen_days_ago = current_time - 15 * 24
|
|
hundred_days_ago = current_time - 100 * 24
|
|
thousand_days_ago = current_time - 1000 * 24
|
|
|
|
for timestamp in (current_time,
|
|
six_days_ago,
|
|
eight_days_ago,
|
|
fifteen_days_ago,
|
|
hundred_days_ago,
|
|
thousand_days_ago):
|
|
# wind back
|
|
self.ldap_update_record(name, txt1, dwTimeStamp=timestamp)
|
|
self.assertEqual(self.get_txt_timestamps(name, txt1), [timestamp])
|
|
|
|
# no change here
|
|
update_timestamp = self.dns_update_record(name, txt1).dwTimeStamp
|
|
self.assert_timestamps_equal(update_timestamp, timestamp)
|
|
|
|
# adding a fresh record
|
|
for timestamp in (current_time,
|
|
six_days_ago,
|
|
eight_days_ago,
|
|
fifteen_days_ago,
|
|
hundred_days_ago,
|
|
thousand_days_ago,
|
|
100000,
|
|
100):
|
|
# wind back
|
|
timestamp1 = self.ldap_update_record(
|
|
name,
|
|
txt1,
|
|
dwTimeStamp=timestamp).dwTimeStamp
|
|
self.assert_timestamps_equal(timestamp1, timestamp)
|
|
|
|
self.dns_update_record(name, txt2)
|
|
timestamps = self.get_txt_timestamps(name, txt1, txt2)
|
|
self.assertEqual(timestamps, [timestamp, current_time])
|
|
|
|
self.ldap_delete_record(name, txt2)
|
|
timestamps = self.get_txt_timestamps(name, txt1)
|
|
self.assertEqual(timestamps, [timestamp])
|
|
|
|
# add record 2.
|
|
timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
|
|
self.assert_soon_after(timestamp2, current_time)
|
|
|
|
for timestamp in (current_time,
|
|
six_days_ago,
|
|
eight_days_ago,
|
|
fifteen_days_ago,
|
|
hundred_days_ago,
|
|
thousand_days_ago,
|
|
100000,
|
|
100):
|
|
# wind back
|
|
self.ldap_update_record(name, txt1, dwTimeStamp=timestamp)
|
|
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
|
|
self.assert_timestamps_equal(timestamp1, timestamp)
|
|
|
|
timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
|
|
# txt1 timestamp is now current time
|
|
timestamps = self.get_txt_timestamps(name, txt1, txt2)
|
|
self.assertEqual(timestamps, [timestamp, current_time])
|
|
|
|
# with 3 records, no change
|
|
for timestamp in (current_time,
|
|
six_days_ago,
|
|
eight_days_ago,
|
|
fifteen_days_ago,
|
|
hundred_days_ago,
|
|
thousand_days_ago,
|
|
100000,
|
|
10):
|
|
# wind back
|
|
self.ldap_update_record(name, txt1, dwTimeStamp=timestamp)
|
|
self.ldap_update_record(name, txt2, dwTimeStamp=timestamp)
|
|
self.ldap_update_record(name, txt3, dwTimeStamp=(timestamp + 30))
|
|
timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
|
|
self.assert_timestamps_equal(timestamp3, timestamp + 30)
|
|
|
|
self.dns_update_record(name, txt2).dwTimeStamp
|
|
timestamps = self.get_txt_timestamps(name, txt1, txt2, txt3)
|
|
self.assertEqual(timestamps, [timestamp,
|
|
timestamp,
|
|
timestamp + 30])
|
|
|
|
# with 3 records, one of which is static
|
|
# first we set the updatee's timestamp to a recognisable number
|
|
self.ldap_update_record(name, txt2, dwTimeStamp=999999)
|
|
for timestamp in (current_time,
|
|
six_days_ago,
|
|
eight_days_ago,
|
|
fifteen_days_ago,
|
|
hundred_days_ago,
|
|
thousand_days_ago,
|
|
100000,
|
|
10):
|
|
# wind back
|
|
self.ldap_update_record(name, txt1, dwTimeStamp=0)
|
|
self.ldap_update_record(name, txt3, dwTimeStamp=(timestamp - 9))
|
|
timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
|
|
self.assert_timestamps_equal(timestamp3, timestamp - 9)
|
|
|
|
self.dns_update_record(name, txt2)
|
|
timestamps = self.get_txt_timestamps(name, txt1, txt2, txt3)
|
|
self.assertEqual(timestamps, [0,
|
|
999999,
|
|
timestamp - 9])
|
|
|
|
# with 3 records, updating one which is static
|
|
timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp
|
|
for timestamp in (current_time,
|
|
six_days_ago,
|
|
eight_days_ago,
|
|
fifteen_days_ago,
|
|
hundred_days_ago,
|
|
thousand_days_ago,
|
|
100000,
|
|
10):
|
|
# wind back
|
|
self.ldap_update_record(name, txt1, dwTimeStamp=0)
|
|
self.ldap_update_record(name, txt2, dwTimeStamp=0)
|
|
self.ldap_update_record(name, txt3, dwTimeStamp=(timestamp + 30))
|
|
timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
|
|
self.assert_timestamps_equal(timestamp3, timestamp + 30)
|
|
|
|
self.dns_update_record(name, txt2).dwTimeStamp
|
|
timestamps = self.get_txt_timestamps(name, txt1, txt2, txt3)
|
|
self.assertEqual(timestamps, [0,
|
|
0,
|
|
timestamp + 30])
|
|
|
|
# with 3 records, after the static nodes have been replaced
|
|
self.ldap_update_record(name, txt1, dwTimeStamp=777777)
|
|
self.ldap_update_record(name, txt2, dwTimeStamp=888888)
|
|
timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp
|
|
for timestamp in (current_time,
|
|
six_days_ago,
|
|
eight_days_ago,
|
|
fifteen_days_ago,
|
|
hundred_days_ago,
|
|
thousand_days_ago,
|
|
100000,
|
|
10):
|
|
# wind back
|
|
self.ldap_update_record(name, txt3, dwTimeStamp=(timestamp))
|
|
timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
|
|
self.assert_timestamps_equal(timestamp3, timestamp)
|
|
|
|
self.dns_update_record(name, txt2)
|
|
timestamps = self.get_txt_timestamps(name, txt1, txt2, txt3)
|
|
self.assertEqual(timestamps, [777777,
|
|
888888,
|
|
timestamp])
|
|
|
|
def _test_update_aging_disabled_n_days_ago(self, n_days):
|
|
name = 'test'
|
|
txt1 = ['1']
|
|
txt2 = ['2']
|
|
|
|
self.set_aging(False)
|
|
current_time = self.dns_update_record(name, txt1).dwTimeStamp
|
|
|
|
# rewind timestamp using ldap
|
|
self.ldap_modify_timestamps(name, n_days * -24)
|
|
n_days_ago = self.get_unique_txt_record(name, txt1).dwTimeStamp
|
|
self.assertGreater(current_time, n_days_ago)
|
|
|
|
# no change when updating this record
|
|
update_timestamp = self.dns_update_record(name, txt1).dwTimeStamp
|
|
self.assert_timestamps_equal(update_timestamp, n_days_ago)
|
|
|
|
# add another record, which should have the current timestamp
|
|
timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
|
|
self.assert_soon_after(timestamp2, current_time)
|
|
|
|
# get the original record timestamp. NOW it matches current_time
|
|
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
|
|
self.assert_timestamps_equal(timestamp1, timestamp2)
|
|
|
|
# let's repeat that, this time with txt2 existing
|
|
self.ldap_update_record(name, txt1, dwTimeStamp=n_days_ago)
|
|
|
|
timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
|
|
self.assert_timestamps_equal(timestamp1, n_days_ago)
|
|
|
|
# this update is not an add
|
|
timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
|
|
self.assert_soon_after(timestamp2, current_time)
|
|
|
|
# now timestamp1 is not changed
|
|
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
|
|
self.assert_timestamps_equal(timestamp1, n_days_ago)
|
|
|
|
# delete record2, try again
|
|
self.ldap_delete_record(name, txt2)
|
|
self.ldap_update_record(name, txt1, dwTimeStamp=n_days_ago)
|
|
|
|
timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
|
|
self.assert_timestamps_equal(timestamp1, n_days_ago)
|
|
|
|
# here we are re-adding the deleted record
|
|
timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
|
|
self.assert_soon_after(timestamp2, current_time)
|
|
|
|
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
|
|
|
|
# It gets weird HERE.
|
|
# note how the SIBLING of the deleted, re-added record differs
|
|
# from the sibling of freshly added record, depending on the
|
|
# time difference.
|
|
if n_days <= 7:
|
|
self.assert_timestamps_equal(timestamp1, n_days_ago)
|
|
else:
|
|
self.assert_timestamps_equal(timestamp1, timestamp2)
|
|
|
|
# re-timestamp record2, try again
|
|
self.ldap_update_record(name, txt2, dwTimeStamp=n_days_ago)
|
|
self.ldap_update_record(name, txt1, dwTimeStamp=n_days_ago)
|
|
|
|
timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
|
|
self.assert_timestamps_equal(timestamp1, n_days_ago)
|
|
|
|
# no change
|
|
timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
|
|
self.assert_timestamps_equal(timestamp2, n_days_ago)
|
|
# also no change
|
|
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
|
|
self.assert_timestamps_equal(timestamp1, timestamp2)
|
|
|
|
# let's introduce another record
|
|
txt3 = ['3']
|
|
self.ldap_update_record(name, txt2, dwTimeStamp=n_days_ago)
|
|
self.ldap_update_record(name, txt1, dwTimeStamp=n_days_ago)
|
|
|
|
timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp
|
|
self.assert_soon_after(timestamp3, current_time)
|
|
|
|
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
|
|
timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
|
|
|
|
if n_days <= 7:
|
|
self.assert_timestamps_equal(timestamp1, n_days_ago)
|
|
else:
|
|
self.assert_timestamps_equal(timestamp1, timestamp3)
|
|
|
|
self.assert_timestamps_equal(timestamp2, timestamp3)
|
|
|
|
self.ldap_delete_record(name, txt3)
|
|
timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp
|
|
self.assert_soon_after(timestamp3, current_time)
|
|
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
|
|
timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
|
|
|
|
if n_days <= 7:
|
|
self.assert_timestamps_equal(timestamp1, n_days_ago)
|
|
else:
|
|
self.assert_timestamps_equal(timestamp1, timestamp3)
|
|
|
|
self.assert_timestamps_equal(timestamp2, timestamp3)
|
|
|
|
# and here we'll make txt3 static
|
|
txt4 = ['4']
|
|
|
|
# and here we'll make txt1 static
|
|
self.ldap_update_record(name, txt1, dwTimeStamp=0)
|
|
self.ldap_update_record(name, txt2, dwTimeStamp=n_days_ago)
|
|
self.ldap_update_record(name, txt3, dwTimeStamp=n_days_ago)
|
|
timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
|
|
timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
|
|
timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
|
|
timestamp4 = self.dns_update_record(name, txt4).dwTimeStamp
|
|
|
|
self.assertEqual(timestamp1, 0)
|
|
self.assert_timestamps_equal(timestamp2, n_days_ago)
|
|
self.assert_timestamps_equal(timestamp3, n_days_ago)
|
|
self.assert_soon_after(timestamp4, current_time)
|
|
|
|
def test_update_aging_disabled_in_no_refresh_window(self):
|
|
self._test_update_aging_disabled_n_days_ago(4)
|
|
|
|
def test_update_aging_disabled_on_no_refresh_boundary(self):
|
|
self._test_update_aging_disabled_n_days_ago(7)
|
|
|
|
def test_update_aging_disabled_in_refresh_window(self):
|
|
self._test_update_aging_disabled_n_days_ago(9)
|
|
|
|
def test_update_aging_disabled_beyond_refresh_window(self):
|
|
self._test_update_aging_disabled_n_days_ago(16)
|
|
|
|
def test_update_aging_disabled_in_eighteenth_century(self):
|
|
self._test_update_aging_disabled_n_days_ago(100000)
|
|
|
|
def test_update_aging_disabled_static(self):
|
|
name = 'test'
|
|
txt1 = ['1']
|
|
txt2 = ['2']
|
|
|
|
self.set_aging(False)
|
|
|
|
current_time = self.dns_update_record(name, txt1).dwTimeStamp
|
|
self.ldap_update_record(name, txt1, dwTimeStamp=0)
|
|
|
|
# no change when updating this record
|
|
timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
|
|
self.assertEqual(timestamp1, 0)
|
|
|
|
# add another record, which should have the current timestamp
|
|
timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
|
|
self.assert_soon_after(timestamp2, current_time)
|
|
|
|
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
|
|
self.assert_soon_after(timestamp1, current_time)
|
|
|
|
# let's repeat that, this time with txt2 existing
|
|
timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
|
|
self.assert_soon_after(timestamp2, current_time)
|
|
|
|
timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
|
|
self.assert_soon_after(timestamp2, current_time)
|
|
|
|
# delete record2, try again
|
|
self.ldap_delete_record(name, txt2)
|
|
self.ldap_update_record(name, txt1, dwTimeStamp=0)
|
|
# no change when updating this record
|
|
timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
|
|
self.assertEqual(timestamp1, 0)
|
|
|
|
timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
|
|
self.assertEqual(timestamp2, 0)
|
|
|
|
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
|
|
self.assertEqual(timestamp1, 0)
|
|
# re-timestamp record2, try again
|
|
self.ldap_update_record(name, txt2, dwTimeStamp=1)
|
|
self.ldap_update_record(name, txt1, dwTimeStamp=0)
|
|
# no change when updating this record
|
|
timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
|
|
self.assert_timestamps_equal(timestamp2, 1)
|
|
|
|
def test_update_aging_disabled(self):
|
|
# With aging disabled, Windows updates the timestamps of all
|
|
# records when one is updated.
|
|
name = 'test'
|
|
txt1 = ['test txt']
|
|
txt2 = ['test', 'txt2']
|
|
txt3 = ['test', 'txt3']
|
|
minus_6 = -6 * 24
|
|
minus_8 = -8 * 24
|
|
|
|
self.set_aging(False)
|
|
|
|
current_time = self.dns_update_record(name, txt1).dwTimeStamp
|
|
|
|
# rewind timestamp using ldap
|
|
self.ldap_modify_timestamps(name, minus_6)
|
|
after_mod = self.get_unique_txt_record(name, txt1)
|
|
six_days_ago = after_mod.dwTimeStamp
|
|
self.assert_timestamps_equal(six_days_ago, current_time + minus_6)
|
|
|
|
# no change
|
|
update_timestamp = self.dns_update_record(name, txt1).dwTimeStamp
|
|
self.assert_timestamps_equal(update_timestamp, six_days_ago)
|
|
|
|
self.check_query_txt(name, txt1, zone=self.zone)
|
|
|
|
# another record
|
|
timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
|
|
self.assert_soon_after(timestamp2, current_time)
|
|
|
|
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
|
|
# without aging, timestamp1 is changed!!
|
|
self.assert_timestamps_equal(timestamp1, timestamp2)
|
|
|
|
# Set both records back to 8 days ago.
|
|
self.ldap_modify_timestamps(name, minus_8)
|
|
|
|
eight_days_ago = self.get_unique_txt_record(name, txt1).dwTimeStamp
|
|
self.assert_timestamps_equal(eight_days_ago, current_time + minus_8)
|
|
|
|
update2 = self.dns_update_record(name, txt2)
|
|
|
|
# Without aging on, an update should not change the timestamps.
|
|
self.assert_timestamps_equal(update2.dwTimeStamp, eight_days_ago)
|
|
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
|
|
self.assert_timestamps_equal(timestamp1, eight_days_ago)
|
|
|
|
# Add another txt record. The new record should have the now
|
|
# timestamp, and drag the others up with it.
|
|
timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp
|
|
self.assert_soon_after(timestamp3, current_time)
|
|
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
|
|
timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
|
|
self.assert_timestamps_equal(timestamp1, timestamp3)
|
|
self.assert_timestamps_equal(timestamp2, timestamp3)
|
|
|
|
hundred_days_ago = current_time - 100 * 24
|
|
thousand_days_ago = current_time - 1000 * 24
|
|
record = self.ldap_update_record(name, txt1,
|
|
dwTimeStamp=hundred_days_ago)
|
|
self.assert_timestamps_equal(record.dwTimeStamp, hundred_days_ago)
|
|
record = self.ldap_update_record(name, txt2,
|
|
dwTimeStamp=thousand_days_ago)
|
|
self.assert_timestamps_equal(record.dwTimeStamp, thousand_days_ago)
|
|
|
|
# update 3, will others change (because beyond RefreshInterval)? yes.
|
|
timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp
|
|
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
|
|
timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
|
|
self.assert_soon_after(timestamp3, current_time)
|
|
self.assert_timestamps_equal(timestamp1, hundred_days_ago)
|
|
self.assert_timestamps_equal(timestamp2, thousand_days_ago)
|
|
|
|
fifteen_days_ago = current_time - 15 * 24
|
|
self.ldap_update_record(name, txt3, dwTimeStamp=fifteen_days_ago)
|
|
|
|
timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
|
|
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
|
|
timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
|
|
# DNS update has no effect because all records are old
|
|
self.assert_timestamps_equal(timestamp2, thousand_days_ago)
|
|
self.assert_timestamps_equal(timestamp1, hundred_days_ago)
|
|
self.assert_timestamps_equal(timestamp3, fifteen_days_ago)
|
|
|
|
# Does update of old record affect timestamp of refreshable record? No.
|
|
self.ldap_update_record(name, txt3, dwTimeStamp=eight_days_ago)
|
|
timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
|
|
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
|
|
timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
|
|
# DNS update has no effect because all records are old
|
|
self.assert_timestamps_equal(timestamp2, thousand_days_ago)
|
|
self.assert_timestamps_equal(timestamp1, hundred_days_ago)
|
|
self.assert_timestamps_equal(timestamp3, eight_days_ago)
|
|
|
|
# RPC zeros timestamp, after which updates won't change it.
|
|
# BUT it refreshes all others!
|
|
self.rpc_update_record(name, txt2)
|
|
|
|
timestamp2 = self.dns_update_record(name, txt3).dwTimeStamp
|
|
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
|
|
timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
|
|
self.assertEqual(timestamp2, 0)
|
|
self.assert_soon_after(timestamp1, current_time)
|
|
self.assert_timestamps_equal(timestamp3, eight_days_ago)
|
|
|
|
def test_update_aging_enabled(self):
|
|
name = 'test'
|
|
txt1 = ['test txt']
|
|
txt2 = ['test', 'txt2']
|
|
txt3 = ['test', 'txt3']
|
|
txt4 = ['4']
|
|
|
|
self.set_aging(True)
|
|
|
|
current_time = self.dns_update_record(name, txt2).dwTimeStamp
|
|
|
|
six_days_ago = current_time - 6 * 24
|
|
eight_days_ago = current_time - 8 * 24
|
|
fifteen_days_ago = current_time - 15 * 24
|
|
hundred_days_ago = current_time - 100 * 24
|
|
|
|
self.ldap_update_record(name, txt1, dwTimeStamp=six_days_ago)
|
|
|
|
# with or without aging, a delta of -6 days does not affect
|
|
# timestamps, because dwNoRefreshInterval is 7 days.
|
|
timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
|
|
timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
|
|
|
|
self.assert_timestamps_equal(timestamp1, six_days_ago)
|
|
self.assert_soon_after(timestamp2, current_time)
|
|
|
|
self.ldap_update_record(name, txt3, dwTimeStamp=eight_days_ago)
|
|
timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
|
|
self.assert_timestamps_equal(timestamp3, eight_days_ago)
|
|
|
|
# update 1, what happens to 2 and 3? Nothing?
|
|
timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
|
|
timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
|
|
timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
|
|
self.assert_timestamps_equal(timestamp1, six_days_ago)
|
|
self.assert_soon_after(timestamp2, current_time)
|
|
self.assert_timestamps_equal(timestamp3, eight_days_ago)
|
|
|
|
# now set 1 to 8 days, and we should see changes
|
|
self.ldap_update_record(name, txt1, dwTimeStamp=eight_days_ago)
|
|
|
|
# update 1, what happens to 2 and 3? Nothing?
|
|
timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
|
|
timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
|
|
timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
|
|
self.assert_soon_after(timestamp1, current_time)
|
|
self.assert_soon_after(timestamp2, current_time)
|
|
self.assert_timestamps_equal(timestamp3, eight_days_ago)
|
|
|
|
# next few ones use these numbers
|
|
self.ldap_update_record(name, txt1, dwTimeStamp=fifteen_days_ago)
|
|
self.ldap_update_record(name, txt2, dwTimeStamp=six_days_ago)
|
|
self.ldap_update_record(name, txt3, dwTimeStamp=eight_days_ago)
|
|
|
|
# change even though 1 is outside the window
|
|
timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
|
|
timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
|
|
timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
|
|
self.assert_soon_after(timestamp1, current_time)
|
|
self.assert_timestamps_equal(timestamp2, six_days_ago)
|
|
self.assert_timestamps_equal(timestamp3, eight_days_ago)
|
|
|
|
# reset 1
|
|
self.ldap_update_record(name, txt1, dwTimeStamp=fifteen_days_ago)
|
|
|
|
# no change, because 2 is outside the window
|
|
timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
|
|
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
|
|
timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
|
|
self.assert_timestamps_equal(timestamp1, fifteen_days_ago)
|
|
self.assert_timestamps_equal(timestamp2, six_days_ago)
|
|
self.assert_timestamps_equal(timestamp3, eight_days_ago)
|
|
|
|
# 3 changes, others do not
|
|
timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp
|
|
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
|
|
timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
|
|
self.assert_timestamps_equal(timestamp1, fifteen_days_ago)
|
|
self.assert_timestamps_equal(timestamp2, six_days_ago)
|
|
self.assert_soon_after(timestamp3, current_time)
|
|
|
|
# reset 3 to 100 days
|
|
self.ldap_update_record(name, txt3, dwTimeStamp=hundred_days_ago)
|
|
|
|
# 3 changes, others do not
|
|
timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp
|
|
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
|
|
timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
|
|
self.assert_timestamps_equal(timestamp1, fifteen_days_ago)
|
|
self.assert_timestamps_equal(timestamp2, six_days_ago)
|
|
self.assert_soon_after(timestamp3, current_time)
|
|
|
|
# reset 1 and 3 to 8 days. does update of 1 affect 3?
|
|
self.ldap_update_record(name, txt1, dwTimeStamp=eight_days_ago)
|
|
self.ldap_update_record(name, txt3, dwTimeStamp=eight_days_ago)
|
|
|
|
# 1 changes, others do not
|
|
timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
|
|
timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
|
|
timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
|
|
self.assert_soon_after(timestamp1, current_time)
|
|
self.assert_timestamps_equal(timestamp2, six_days_ago)
|
|
self.assert_timestamps_equal(timestamp3, eight_days_ago)
|
|
|
|
# Try an RPC update, zeroing 1 --> what happens to 3?
|
|
timestamp1 = self.rpc_update_record(name, txt1).dwTimeStamp
|
|
timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
|
|
timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
|
|
self.assertEqual(timestamp1, 0)
|
|
self.assert_timestamps_equal(timestamp2, six_days_ago)
|
|
self.assert_timestamps_equal(timestamp3, eight_days_ago)
|
|
|
|
# with 2 and 3 at 8 days, does static record change things?
|
|
self.ldap_update_record(name, txt2, dwTimeStamp=eight_days_ago)
|
|
# 2 changes, but to zero!
|
|
timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
|
|
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
|
|
timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
|
|
self.assert_timestamps_equal(timestamp1, 0)
|
|
self.assert_timestamps_equal(timestamp2, 0)
|
|
self.assert_timestamps_equal(timestamp3, eight_days_ago)
|
|
|
|
self.ldap_update_record(name, txt2, dwTimeStamp=six_days_ago)
|
|
self.ldap_update_record(name, txt1, dwTimeStamp=3000000)
|
|
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
|
|
self.assert_timestamps_equal(timestamp1, 3000000)
|
|
|
|
# dns update remembers that node is static, even with no
|
|
# static records.
|
|
timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
|
|
self.assertEqual(timestamp1, 0)
|
|
|
|
# Add another txt record. The new record should have the now
|
|
# timestamp, and the others should remain unchanged.
|
|
# BUT somehow record 1 is static!?
|
|
timestamp4 = self.dns_update_record(name, txt4).dwTimeStamp
|
|
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
|
|
timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
|
|
timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
|
|
self.assert_timestamps_equal(timestamp1, 0)
|
|
self.assert_timestamps_equal(timestamp2, six_days_ago)
|
|
self.assert_timestamps_equal(timestamp3, eight_days_ago)
|
|
self.assert_timestamps_equal(timestamp4, 0)
|
|
|
|
def _test_update_aging_enabled_n_days_ago(self, n_days):
|
|
name = 'test'
|
|
txt1 = ['1']
|
|
txt2 = ['2']
|
|
delta = n_days * -24
|
|
|
|
self.set_aging(True)
|
|
current_time = self.dns_update_record(name, txt1).dwTimeStamp
|
|
|
|
# rewind timestamp using ldap
|
|
self.ldap_modify_timestamps(name, delta)
|
|
n_days_ago = self.get_unique_txt_record(name, txt1).dwTimeStamp
|
|
self.assertGreater(current_time, n_days_ago)
|
|
|
|
# update changes timestamp depending on time.
|
|
timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
|
|
if n_days <= 7:
|
|
self.assert_timestamps_equal(timestamp1, n_days_ago)
|
|
else:
|
|
self.assert_soon_after(timestamp1, current_time)
|
|
|
|
# add another record, which should have the current timestamp
|
|
timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
|
|
self.assert_soon_after(timestamp2, current_time)
|
|
|
|
# first record should not have changed
|
|
timestamp1_b = self.get_unique_txt_record(name, txt1).dwTimeStamp
|
|
self.assert_timestamps_equal(timestamp1, timestamp1_b)
|
|
|
|
# let's repeat that, this time with txt2 existing
|
|
self.ldap_update_record(name, txt1, dwTimeStamp=n_days_ago)
|
|
|
|
timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
|
|
self.assert_timestamps_equal(timestamp1, timestamp1_b)
|
|
|
|
# this update is not an add. record 2 is already up-to-date
|
|
timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
|
|
self.assert_soon_after(timestamp2, current_time)
|
|
|
|
# now timestamp1 is not changed
|
|
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
|
|
self.assert_timestamps_equal(timestamp1, timestamp1_b)
|
|
|
|
# delete record2, try again
|
|
self.ldap_delete_record(name, txt2)
|
|
self.ldap_update_record(name, txt1, dwTimeStamp=n_days_ago)
|
|
|
|
timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
|
|
if n_days <= 7:
|
|
self.assert_timestamps_equal(timestamp1, n_days_ago)
|
|
else:
|
|
self.assert_soon_after(timestamp1, current_time)
|
|
|
|
# here we are re-adding the deleted record
|
|
timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
|
|
self.assert_soon_after(timestamp2, current_time)
|
|
|
|
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
|
|
|
|
# It gets weird HERE.
|
|
# note how the SIBLING of the deleted, re-added record differs
|
|
# from the sibling of freshly added record, depending on the
|
|
# time difference.
|
|
if n_days <= 7:
|
|
self.assert_timestamps_equal(timestamp1, n_days_ago)
|
|
else:
|
|
self.assert_timestamps_equal(timestamp1, timestamp2)
|
|
|
|
# re-timestamp record2, try again
|
|
self.ldap_update_record(name, txt2, dwTimeStamp=n_days_ago)
|
|
self.ldap_update_record(name, txt1, dwTimeStamp=n_days_ago)
|
|
|
|
# this should make no difference
|
|
timestamp1_b = self.dns_update_record(name, txt1).dwTimeStamp
|
|
self.assert_timestamps_equal(timestamp1, timestamp1_b)
|
|
|
|
# no change
|
|
timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
|
|
self.assert_timestamps_equal(timestamp2, timestamp1)
|
|
# also no change
|
|
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
|
|
self.assert_timestamps_equal(timestamp1, timestamp2)
|
|
|
|
# let's introduce another record
|
|
txt3 = ['3']
|
|
self.ldap_update_record(name, txt2, dwTimeStamp=n_days_ago)
|
|
self.ldap_update_record(name, txt1, dwTimeStamp=n_days_ago)
|
|
|
|
timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp
|
|
self.assert_soon_after(timestamp3, current_time)
|
|
|
|
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
|
|
timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
|
|
|
|
self.assert_timestamps_equal(timestamp1, n_days_ago)
|
|
self.assert_timestamps_equal(timestamp2, n_days_ago)
|
|
|
|
self.ldap_delete_record(name, txt3)
|
|
timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp
|
|
self.assert_soon_after(timestamp3, current_time)
|
|
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
|
|
timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
|
|
|
|
self.assert_timestamps_equal(timestamp1, n_days_ago)
|
|
self.assert_timestamps_equal(timestamp2, n_days_ago)
|
|
|
|
txt4 = ['4']
|
|
|
|
# Because txt1 is static, txt4 is static
|
|
self.ldap_update_record(name, txt1, dwTimeStamp=0)
|
|
self.ldap_update_record(name, txt2, dwTimeStamp=n_days_ago)
|
|
self.ldap_update_record(name, txt3, dwTimeStamp=n_days_ago)
|
|
timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
|
|
timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
|
|
timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
|
|
timestamp4 = self.dns_update_record(name, txt4).dwTimeStamp
|
|
|
|
self.assert_timestamps_equal(timestamp1, 0)
|
|
self.assert_timestamps_equal(timestamp2, n_days_ago)
|
|
self.assert_timestamps_equal(timestamp3, n_days_ago)
|
|
self.assert_timestamps_equal(timestamp4, 0)
|
|
|
|
longer_ago = n_days_ago // 2
|
|
|
|
# remove all static records.
|
|
self.ldap_delete_record(name, txt4)
|
|
self.ldap_update_record(name, txt1, dwTimeStamp=longer_ago)
|
|
self.ldap_update_record(name, txt2, dwTimeStamp=n_days_ago)
|
|
self.ldap_update_record(name, txt3, dwTimeStamp=n_days_ago)
|
|
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
|
|
self.assert_timestamps_equal(timestamp1, longer_ago)
|
|
|
|
timestamp4 = self.dns_update_record(name, txt4).dwTimeStamp
|
|
timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
|
|
timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
|
|
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
|
|
|
|
# Here, although there is no record frm which to get the zero
|
|
# timestamp, record 4 does it anyway.
|
|
self.assert_timestamps_equal(timestamp1, longer_ago)
|
|
self.assert_timestamps_equal(timestamp2, n_days_ago)
|
|
self.assert_timestamps_equal(timestamp3, n_days_ago)
|
|
self.assert_timestamps_equal(timestamp4, 0)
|
|
|
|
# and now record 1 wants to be static.
|
|
self.ldap_update_record(name, txt4, dwTimeStamp=longer_ago)
|
|
timestamp4 = self.get_unique_txt_record(name, txt4).dwTimeStamp
|
|
self.assert_timestamps_equal(timestamp4, longer_ago)
|
|
timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
|
|
timestamp4 = self.get_unique_txt_record(name, txt4).dwTimeStamp
|
|
self.assert_timestamps_equal(timestamp1, 0)
|
|
self.assert_timestamps_equal(timestamp4, longer_ago)
|
|
|
|
def test_update_aging_enabled_in_no_refresh_window(self):
|
|
self._test_update_aging_enabled_n_days_ago(4)
|
|
|
|
def test_update_aging_enabled_on_no_refresh_boundary(self):
|
|
self._test_update_aging_enabled_n_days_ago(7)
|
|
|
|
def test_update_aging_enabled_in_refresh_window(self):
|
|
self._test_update_aging_enabled_n_days_ago(9)
|
|
|
|
def test_update_aging_enabled_beyond_refresh_window(self):
|
|
self._test_update_aging_enabled_n_days_ago(16)
|
|
|
|
def test_update_aging_enabled_in_eighteenth_century(self):
|
|
self._test_update_aging_enabled_n_days_ago(100000)
|
|
|
|
def test_update_static_stickiness(self):
|
|
name = 'test'
|
|
A = ['A']
|
|
B = ['B']
|
|
C = ['C']
|
|
D = ['D']
|
|
|
|
self.set_aging(False)
|
|
self.dns_update_record(name, A).dwTimeStamp
|
|
self.ldap_update_record(name, B, dwTimeStamp=0)
|
|
self.dns_update_record(name, B)
|
|
self.dns_update_record(name, C)
|
|
ctime = self.get_unique_txt_record(name, C).dwTimeStamp
|
|
self.assertEqual(ctime, 0)
|
|
btime = self.get_unique_txt_record(name, B).dwTimeStamp
|
|
self.assertEqual(btime, 0)
|
|
|
|
self.ldap_replace_records(name, [])
|
|
|
|
self.dns_update_record(name, D)
|
|
dtime = self.get_unique_txt_record(name, D).dwTimeStamp
|
|
self.assertEqual(dtime, 0)
|
|
|
|
def _test_update_timestamp_weirdness(self, n_days, aging=True):
|
|
name = 'test'
|
|
A = ['A']
|
|
B = ['B']
|
|
|
|
self.set_aging(aging)
|
|
|
|
current_time = self.dns_update_record(name, A).dwTimeStamp
|
|
|
|
# rewind timestamp using ldap
|
|
self.ldap_modify_timestamps(name, n_days * -24)
|
|
n_days_ago = self.get_unique_txt_record(name, A).dwTimeStamp
|
|
time_A = self.dns_update_record(name, A).dwTimeStamp
|
|
# that dns_update should have reset the timestamp ONLY if
|
|
# aging is on and the old timestamp is > noRefresh period (7
|
|
# days)
|
|
if n_days > 7 and aging:
|
|
self.assert_soon_after(time_A, current_time)
|
|
else:
|
|
self.assert_timestamps_equal(time_A, n_days_ago)
|
|
|
|
# add another record, which should have the current timestamp
|
|
time_B = self.dns_update_record(name, B).dwTimeStamp
|
|
self.assert_soon_after(time_B, current_time)
|
|
|
|
time_A = self.get_unique_txt_record(name, A).dwTimeStamp
|
|
if aging and n_days <= 7:
|
|
self.assert_timestamps_equal(time_A, n_days_ago)
|
|
else:
|
|
self.assert_soon_after(time_A, current_time)
|
|
|
|
# delete B, try again
|
|
self.ldap_delete_record(name, B)
|
|
self.ldap_update_record(name, A, dwTimeStamp=n_days_ago)
|
|
|
|
time_A = self.dns_update_record(name, A).dwTimeStamp
|
|
|
|
# here we are re-adding the deleted record
|
|
time_B = self.dns_update_record(name, B).dwTimeStamp
|
|
self.assert_soon_after(time_B, current_time)
|
|
|
|
time_A = self.get_unique_txt_record(name, A).dwTimeStamp
|
|
return n_days_ago, time_A, time_B
|
|
|
|
def test_update_timestamp_weirdness_no_refresh_no_aging(self):
|
|
n_days_ago, time_A, time_B = \
|
|
self._test_update_timestamp_weirdness(5, False)
|
|
# the timestamp of the SIBLING of the deleted, re-added record
|
|
# differs from the sibling of freshly added record.
|
|
self.assert_timestamps_equal(time_A, n_days_ago)
|
|
|
|
def test_update_timestamp_weirdness_no_refresh_aging(self):
|
|
n_days_ago, time_A, time_B = \
|
|
self._test_update_timestamp_weirdness(5, True)
|
|
# the timestamp of the SIBLING of the deleted, re-added record
|
|
# differs from the sibling of freshly added record.
|
|
self.assert_timestamps_equal(time_A, n_days_ago)
|
|
|
|
def test_update_timestamp_weirdness_refresh_no_aging(self):
|
|
n_days_ago, time_A, time_B = \
|
|
self._test_update_timestamp_weirdness(9, False)
|
|
self.assert_timestamps_equal(time_A, time_B)
|
|
|
|
def test_update_timestamp_weirdness_refresh_aging(self):
|
|
n_days_ago, time_A, time_B = \
|
|
self._test_update_timestamp_weirdness(9, True)
|
|
self.assert_timestamps_equal(time_A, time_B)
|
|
|
|
def test_aging_refresh(self):
|
|
name, txt = 'agingtest', ['test txt']
|
|
no_refresh = 200
|
|
refresh = 160
|
|
self.set_zone_int_params(NoRefreshInterval=no_refresh,
|
|
RefreshInterval=refresh,
|
|
Aging=1)
|
|
before_mod = self.dns_update_record(name, txt)
|
|
start_time = before_mod.dwTimeStamp
|
|
|
|
# go back 86 hours, which is in the no-refresh time (but
|
|
# wouldn't be if we had stuck to the default of 168).
|
|
self.ldap_modify_timestamps(name, -170)
|
|
rec = self.dns_update_record(name, txt)
|
|
self.assert_timestamps_equal(rec.dwTimeStamp,
|
|
start_time - 170)
|
|
|
|
# back to -202 hours, into the refresh zone
|
|
# the update should reset the timestamp to now.
|
|
self.ldap_modify_timestamps(name, -32)
|
|
rec = self.dns_update_record(name, txt)
|
|
self.assert_soon_after(rec.dwTimeStamp, start_time)
|
|
|
|
# back to -362 hours, beyond the end of the refresh period.
|
|
# Actually nothing changes at this time -- we can still
|
|
# refresh, but the record is liable for scavenging.
|
|
self.ldap_modify_timestamps(name, -160)
|
|
rec = self.dns_update_record(name, txt)
|
|
self.assert_soon_after(rec.dwTimeStamp, start_time)
|
|
|
|
def test_add_no_timestamp(self):
|
|
# check zero timestamp is implicit
|
|
self.set_aging(True)
|
|
rec = self.ldap_update_record('ldap', 'test')
|
|
self.assertEqual(rec.dwTimeStamp, 0)
|
|
rec = self.rpc_update_record('rpc', 'test')
|
|
self.assertEqual(rec.dwTimeStamp, 0)
|
|
|
|
def test_add_zero_timestamp(self):
|
|
rec = self.ldap_update_record('ldap', 'test', dwTimeStamp=0)
|
|
self.assertEqual(rec.dwTimeStamp, 0)
|
|
rec = self.rpc_update_record('rpc', 'test', dwTimeStamp=0)
|
|
self.assertEqual(rec.dwTimeStamp, 0)
|
|
|
|
def test_add_update_timestamp(self):
|
|
# LDAP can change timestamp, RPC can't
|
|
rec = self.ldap_update_record('ldap', 'test', dwTimeStamp=123456)
|
|
self.assertEqual(rec.dwTimeStamp, 123456)
|
|
rec = self.rpc_update_record('rpc', 'test', dwTimeStamp=123456)
|
|
self.assertEqual(rec.dwTimeStamp, 0)
|
|
# second time is a different code path (add vs update)
|
|
rec = self.rpc_update_record('rpc', 'test', dwTimeStamp=123456)
|
|
self.assertEqual(rec.dwTimeStamp, 0)
|
|
# RPC update the one with timestamp, zeroing it.
|
|
rec = self.rpc_update_record('ldap', 'test', dwTimeStamp=123456)
|
|
self.assertEqual(rec.dwTimeStamp, 0)
|
|
|
|
def test_add_update_ttl(self):
|
|
# RPC *can* set dwTtlSeconds.
|
|
rec = self.ldap_update_record('ldap', 'test',
|
|
dwTtlSeconds=1234)
|
|
self.assertEqual(rec.dwTtlSeconds, 1234)
|
|
rec = self.rpc_update_record('rpc', 'test', dwTtlSeconds=1234)
|
|
self.assertEqual(rec.dwTtlSeconds, 1234)
|
|
# does update work like add?
|
|
rec = self.rpc_update_record('rpc', 'test', dwTtlSeconds=4321)
|
|
self.assertEqual(rec.dwTtlSeconds, 4321)
|
|
rec = self.rpc_update_record('ldap', 'test', dwTtlSeconds=5678)
|
|
self.assertEqual(rec.dwTtlSeconds, 5678)
|
|
|
|
def test_add_update_ttl_serial(self):
|
|
# when setting dwTtlSeconds, what happens to serial number?
|
|
rec = self.ldap_update_record('ldap', 'test',
|
|
dwTtlSeconds=1234,
|
|
dwSerial=123)
|
|
self.assertEqual(rec.dwTtlSeconds, 1234)
|
|
self.assertEqual(rec.dwSerial, 123)
|
|
rec = self.rpc_update_record('rpc', 'test', dwTtlSeconds=1234)
|
|
self.assertEqual(rec.dwTtlSeconds, 1234)
|
|
serial = rec.dwSerial
|
|
self.assertLess(serial, 4)
|
|
rec = self.rpc_update_record('rpc', 'test', dwTtlSeconds=4321)
|
|
self.assertEqual(rec.dwTtlSeconds, 4321)
|
|
self.assertEqual(rec.dwSerial, serial + 1)
|
|
rec = self.rpc_update_record('ldap', 'test', dwTtlSeconds=5678)
|
|
self.assertEqual(rec.dwTtlSeconds, 5678)
|
|
self.assertEqual(rec.dwSerial, 124)
|
|
|
|
def test_add_update_dwFlags(self):
|
|
# dwFlags splits into rank and flags.
|
|
# according to [MS-DNSP] 2.3.2.2, flags MUST be zero
|
|
rec = self.ldap_update_record('ldap', 'test', flags=22222, rank=222)
|
|
self.assertEqual(rec.flags, 22222)
|
|
self.assertEqual(rec.rank, 222)
|
|
|
|
rec = self.rpc_update_record('ldap', 'test', dwFlags=3333333)
|
|
# rank != 3333333 & 0xff == 213
|
|
self.assertEqual(rec.rank, 240) # RPC fixes rank
|
|
self.assertEqual(rec.flags, 0)
|
|
|
|
self.assertRaises(OverflowError,
|
|
self.ldap_update_record,
|
|
'ldap', 'test', flags=777777777, rank=777)
|
|
|
|
# reset to no default (rank overflows)
|
|
rec = self.ldap_update_record('ldap', 'test', flags=7777, rank=777)
|
|
self.assertEqual(rec.flags, 7777)
|
|
self.assertEqual(rec.rank, 9)
|
|
|
|
# DNS update zeros flags, sets rank to 240 (RANK_ZONE)
|
|
rec = self.dns_update_record('ldap', 'test', ttl=999)
|
|
self.assertEqual(rec.flags, 0)
|
|
self.assertEqual(rec.rank, 240)
|
|
|
|
rec = self.rpc_update_record('ldap', 'test', dwFlags=321)
|
|
self.assertEqual(rec.flags, 0)
|
|
self.assertEqual(rec.rank, 240)
|
|
|
|
# RPC adding a new record: fixed rank, zero flags
|
|
rec = self.rpc_update_record('ldap', 'test 2', dwFlags=12345)
|
|
self.assertEqual(rec.rank, 240)
|
|
self.assertEqual(rec.flags, 0)
|
|
|
|
def test_add_update_dwReserved(self):
|
|
# RPC does not change dwReserved.
|
|
rec = self.ldap_update_record('ldap', 'test', dwReserved=54321)
|
|
self.assertEqual(rec.dwReserved, 54321)
|
|
rec = self.rpc_update_record('rpc', 'test', dwReserved=54321)
|
|
self.assertEqual(rec.dwReserved, 0)
|
|
rec = self.rpc_update_record('rpc', 'test', dwReserved=54321)
|
|
self.assertEqual(rec.dwReserved, 0)
|
|
rec = self.rpc_update_record('ldap', 'test', dwReserved=12345)
|
|
self.assertEqual(rec.dwReserved, 54321)
|
|
|
|
def test_add_update_dwSerial(self):
|
|
# On Windows the RPC record ends up with serial 2, on Samba
|
|
# serial 3. Rather than knownfail this, we accept anything
|
|
# below 4 (for now).
|
|
rec = self.ldap_update_record('ldap', 'test', dwSerial=123)
|
|
self.assertEqual(rec.dwSerial, 123)
|
|
rec = self.rpc_update_record('rpc', 'test', dwSerial=123)
|
|
self.assertLess(rec.dwSerial, 4)
|
|
rec = self.rpc_update_record('rpc', 'test', dwSerial=123)
|
|
self.assertLess(rec.dwSerial, 4)
|
|
rec = self.dns_update_record('rpc', 'test')
|
|
self.assertLess(rec.dwSerial, 4)
|
|
rec = self.dns_update_record('dns-0', 'test')
|
|
self.assertLess(rec.dwSerial, 5)
|
|
|
|
rec = self.dns_update_record('ldap', 'test')
|
|
self.assertEqual(rec.dwSerial, 123)
|
|
rec = self.rpc_update_record('ldap', 'test', dwSerial=123)
|
|
self.assertEqual(rec.dwSerial, 123)
|
|
rec = self.ldap_update_record('ldap', 'test', dwSerial=12)
|
|
self.assertEqual(rec.dwSerial, 12)
|
|
# when we dns-updated ldap/test, we alerted Windows to 123 as
|
|
# a high water mark for the zone. (even though we have since
|
|
# dropped the serial to 12, 123 is the base serial for new
|
|
# records).
|
|
rec = self.dns_update_record('dns', 'test')
|
|
self.assertEqual(rec.dwSerial, 124)
|
|
rec = self.dns_update_record('dns2', 'test')
|
|
self.assertEqual(rec.dwSerial, 125)
|
|
rec = self.rpc_update_record('rpc2', 'test')
|
|
self.assertEqual(rec.dwSerial, 126)
|
|
rec = self.dns_update_record('dns', 'test 2')
|
|
self.assertEqual(rec.dwSerial, 127)
|
|
|
|
def test_add_update_dwSerial_2(self):
|
|
# On Samba the RPC update resets the serial to a low number,
|
|
# while Windows leaves it high.
|
|
rec = self.ldap_update_record('ldap', 'test', dwSerial=123)
|
|
self.assertEqual(rec.dwSerial, 123)
|
|
rec = self.rpc_update_record('ldap', 'test', dwSerial=321)
|
|
self.assertEqual(rec.dwSerial, 123)
|
|
rec = self.dns_update_record('ldap', 'test')
|
|
self.assertEqual(rec.dwSerial, 123)
|
|
|
|
def test_rpc_update_disparate_types(self):
|
|
"""Can we use update to replace a TXT with an AAAA?"""
|
|
name = 'x'
|
|
old = TXTRecord("x")
|
|
new = ARecord("127.0.0.111")
|
|
self.rpc_replace(name, None, old)
|
|
recs = self.ldap_get_records(name)
|
|
self.assertEqual(len(recs), 1)
|
|
self.assertEqual(recs[0].wType, old.wType)
|
|
|
|
self.rpc_replace(name, old, new)
|
|
recs = self.ldap_get_records(name)
|
|
self.assertEqual(len(recs), 1)
|
|
self.assertEqual(recs[0].wType, new.wType)
|
|
|
|
def test_add_update_many(self):
|
|
# Samba fails often in this set, but we want to see how it
|
|
# goes further down, so we print the problems and defer the
|
|
# failure.
|
|
failures = 0
|
|
total = 0
|
|
|
|
def _defer_wrap(f):
|
|
def _defer(*args):
|
|
nonlocal failures, total
|
|
total += 1
|
|
try:
|
|
f(*args)
|
|
except self.failureException as e:
|
|
from traceback import format_stack
|
|
print(f"{format_stack()[-2]} {e}\n")
|
|
failures += 1
|
|
return _defer
|
|
|
|
defer_assertEqual = _defer_wrap(self.assertEqual)
|
|
defer_assert_timestamp_in_ballpark = \
|
|
_defer_wrap(self.assert_timestamp_in_ballpark)
|
|
|
|
self.set_aging(False)
|
|
rec = self.ldap_update_record('ldap', 'test',
|
|
version=11,
|
|
rank=22,
|
|
flags=33,
|
|
dwSerial=44,
|
|
dwTtlSeconds=55,
|
|
dwReserved=66,
|
|
dwTimeStamp=77)
|
|
|
|
self.assertEqual(rec.version, 5) # disobeys request
|
|
self.assertEqual(rec.rank, 22)
|
|
self.assertEqual(rec.flags, 33)
|
|
self.assertEqual(rec.dwSerial, 44)
|
|
self.assertEqual(rec.dwTtlSeconds, 55)
|
|
self.assertEqual(rec.dwReserved, 66)
|
|
self.assertEqual(rec.dwTimeStamp, 77)
|
|
# DNS updates first
|
|
rec = self.dns_update_record('ldap', 'test', ttl=999)
|
|
self.assertEqual(rec.version, 5)
|
|
self.assertEqual(rec.rank, 240) # rank gets fixed by DNS update
|
|
defer_assertEqual(rec.flags, 0) # flags gets fixed
|
|
defer_assertEqual(rec.dwSerial, 45) # serial increments
|
|
self.assertEqual(rec.dwTtlSeconds, 999) # TTL set
|
|
defer_assertEqual(rec.dwReserved, 0) # reserved fixed
|
|
defer_assert_timestamp_in_ballpark(rec) # changed on Windows ?!
|
|
|
|
self.set_aging(True)
|
|
rec = self.dns_update_record('ldap', 'test', ttl=1111)
|
|
self.assertEqual(rec.version, 5)
|
|
self.assertEqual(rec.rank, 240)
|
|
defer_assertEqual(rec.flags, 0)
|
|
defer_assertEqual(rec.dwSerial, 46)
|
|
self.assertEqual(rec.dwTtlSeconds, 1111) # TTL set
|
|
defer_assertEqual(rec.dwReserved, 0)
|
|
self.assert_timestamp_in_ballpark(rec)
|
|
|
|
# RPC update
|
|
rec = self.rpc_update_record('ldap', 'test',
|
|
version=111,
|
|
dwFlags=333,
|
|
dwSerial=444,
|
|
dwTtlSeconds=555,
|
|
dwReserved=666,
|
|
dwTimeStamp=777)
|
|
|
|
self.assertEqual(rec.version, 5) # no change
|
|
self.assertEqual(rec.rank, 240) # no change
|
|
defer_assertEqual(rec.flags, 0) # no change
|
|
defer_assertEqual(rec.dwSerial, 47) # Serial increments
|
|
self.assertEqual(rec.dwTtlSeconds, 555) # TTL set
|
|
defer_assertEqual(rec.dwReserved, 0) # no change
|
|
self.assertEqual(rec.dwTimeStamp, 0) # timestamp zeroed
|
|
|
|
# RPC update, using default values
|
|
rec = self.rpc_update_record('ldap', 'test')
|
|
self.assertEqual(rec.version, 5)
|
|
self.assertEqual(rec.rank, 240)
|
|
defer_assertEqual(rec.flags, 0)
|
|
defer_assertEqual(rec.dwSerial, 48) # serial increments
|
|
self.assertEqual(rec.dwTtlSeconds, 900) # TTL changed
|
|
defer_assertEqual(rec.dwReserved, 0)
|
|
self.assertEqual(rec.dwTimeStamp, 0)
|
|
|
|
self.set_aging(False)
|
|
rec = self.dns_update_record('ldap', 'test', ttl=888)
|
|
self.assertEqual(rec.version, 5)
|
|
self.assertEqual(rec.rank, 240)
|
|
defer_assertEqual(rec.flags, 0)
|
|
defer_assertEqual(rec.dwSerial, 49) # serial increments
|
|
self.assertEqual(rec.dwTtlSeconds, 888) # TTL set
|
|
defer_assertEqual(rec.dwReserved, 0)
|
|
self.assertEqual(rec.dwTimeStamp, 0) # timestamp stays zero
|
|
|
|
if failures:
|
|
self.fail(f"failed {failures}/{total} defered assertions")
|
|
|
|
def test_static_record_dynamic_update(self):
|
|
"""Add a static record, then a dynamic record.
|
|
The dynamic record should have a timestamp set."""
|
|
name = 'test'
|
|
txt = ['static txt']
|
|
txt2 = ['dynamic txt']
|
|
self.set_aging(True)
|
|
rec = self.ldap_update_record(name, txt, dwTimeStamp=0)
|
|
rec2 = self.dns_update_record(name, txt2)
|
|
self.assert_timestamp_in_ballpark(rec2)
|
|
ts2 = rec2.dwTimeStamp
|
|
# update the first record. It should stay static (timestamp 0)
|
|
rec = self.dns_update_record(name, txt)
|
|
self.assertEqual(rec.dwTimeStamp, 0)
|
|
# and rec2 should be unchanged.
|
|
self.assertEqual(rec2.dwTimeStamp, ts2)
|
|
|
|
def test_dynamic_record_static_update(self):
|
|
name = 'agingtest'
|
|
txt1 = ['dns update before']
|
|
txt2 = ['ldap update']
|
|
txt3 = ['dns update after']
|
|
self.set_aging(True)
|
|
|
|
self.dns_update_record(name, txt1)
|
|
self.ldap_update_record(name, txt2)
|
|
self.dns_update_record(name, txt3)
|
|
|
|
recs = self.get_rpc_records(name)
|
|
for r in recs:
|
|
d = [x.str for x in r.data.str]
|
|
if d == txt1:
|
|
self.assertNotEqual(r.dwTimeStamp, 0)
|
|
elif d == txt2:
|
|
self.assertEqual(r.dwTimeStamp, 0)
|
|
elif d == txt3:
|
|
self.assertNotEqual(r.dwTimeStamp, 0)
|
|
|
|
def test_tombstone_in_hours_and_nttime(self):
|
|
# Until now Samba has measured tombstone timestamps in hours,
|
|
# not ten-millionths of a second. After now, we want Samba to
|
|
# handle both.
|
|
|
|
nh, oh, nn, on, on0, onf, nn0, nnf, _1601 = 'abcdefgij'
|
|
now_hours = dsdb_dns.unix_to_dns_timestamp(int(time.time()))
|
|
old_hours = now_hours - 24 * 90
|
|
now_nttime = dsdb_dns.dns_timestamp_to_nt_time(now_hours)
|
|
old_nttime = dsdb_dns.dns_timestamp_to_nt_time(old_hours)
|
|
# calculations on hours might be based on the lower 32 bits,
|
|
# so we test with these forced to extremes (the maximum change
|
|
# is 429 seconds in NTTIME).
|
|
old_nttime0 = old_nttime & 0xffffffff00000000
|
|
old_nttimef = old_nttime | 0xffffffff
|
|
now_nttime0 = now_nttime & 0xffffffff00000000
|
|
now_nttimef = now_nttime | 0xffffffff
|
|
self.dns_tombstone(nh, epoch_nttime=now_hours)
|
|
self.dns_tombstone(oh, epoch_nttime=old_hours)
|
|
self.dns_tombstone(nn, epoch_nttime=now_nttime)
|
|
self.dns_tombstone(on, epoch_nttime=old_nttime)
|
|
self.dns_tombstone(nn0, epoch_nttime=now_nttime0)
|
|
self.dns_tombstone(nnf, epoch_nttime=now_nttimef)
|
|
self.dns_tombstone(on0, epoch_nttime=old_nttime0)
|
|
self.dns_tombstone(onf, epoch_nttime=old_nttimef)
|
|
# this is our (arbitrary) threshold that will make us think in
|
|
# NTTIME, not hours.
|
|
self.dns_tombstone(_1601, epoch_nttime=(10 * 1000 * 1000 + 1))
|
|
|
|
try:
|
|
file_samdb = get_file_samdb()
|
|
except ldb.LdbError as e:
|
|
raise AssertionError(
|
|
f"failing because '{e}': this is Windows?") from None
|
|
dsdb._dns_delete_tombstones(file_samdb)
|
|
|
|
# nh and nn should not be deleted
|
|
for name in nh, nn, nn0, nnf:
|
|
recs = self.ldap_get_records(name)
|
|
self.assertEqual(len(recs), 1)
|
|
self.assert_tombstoned(name, timestamp=False)
|
|
|
|
# oh and on should be GONE
|
|
for name in oh, on, on0, onf, _1601:
|
|
recs = self.ldap_get_records(name)
|
|
self.assertEqual(len(recs), 0)
|
|
|
|
def test_dns_query_for_tombstoned_results(self):
|
|
# This one fails on Windows, because the dns cache holds B
|
|
# after it has been tombstoned behind its back.
|
|
A = 'a'
|
|
B = 'b'
|
|
self.dns_tombstone(A)
|
|
self.assert_tombstoned(A)
|
|
r = self.dns_query(A, qtype=dns.DNS_QTYPE_TXT)
|
|
self.assertEqual(r.ancount, 0)
|
|
|
|
self.dns_update_record(B, B)
|
|
self.dns_tombstone(B)
|
|
self.assert_tombstoned(B)
|
|
r = self.dns_query(B, qtype=dns.DNS_QTYPE_TXT)
|
|
self.assertEqual(r.ancount, 0)
|
|
|
|
def test_basic_scavenging(self):
|
|
# NOTE: This one fails on Windows, because the RPC call to
|
|
# prompt scavenging is not immediate. On Samba, in the
|
|
# testenv, we don't have the RPC call but we can connect to
|
|
# the database directly.
|
|
|
|
# just to be sure we have the right limits.
|
|
self.set_zone_int_params(NoRefreshInterval=168,
|
|
RefreshInterval=168,
|
|
Aging=1)
|
|
|
|
ts1, ts2, ts3, ts4, ts5, ts6 = ('1', '2', '3', '4', '5', '6')
|
|
self.dns_update_record(ts1, ts1)
|
|
self.dns_update_record(ts2, ts2)
|
|
# ts2 is tombstoned and timestamped in 1981
|
|
self.dns_tombstone(ts2)
|
|
# ts3 is tombstoned and timestamped in the future
|
|
self.dns_tombstone(ts3, epoch_hours=(DNS_TIMESTAMP_2101 - 1))
|
|
# ts4 is tombstoned and timestamped in the past
|
|
self.dns_tombstone(ts4, epoch_hours=1111111)
|
|
# ts5 is tombstoned in the past and timestamped in the future
|
|
self.dns_tombstone(ts5, epoch_hours=5555555, epoch_nttime=int(1e10))
|
|
|
|
# ts2 and ts3 should now be tombstoned.
|
|
self.assert_tombstoned(ts2)
|
|
self.assert_tombstoned(ts3)
|
|
|
|
# let's un-tombstone ts2
|
|
# ending up with dnsTombstoned: FALSE in Samba
|
|
# and no dNSTombstoned in Windows.
|
|
self.dns_update_record(ts2, "ts2 untombstoned")
|
|
ts2_node = self.get_one_node(ts2)
|
|
ts2_tombstone = ts2_node.get("dNSTombstoned")
|
|
if ts2_tombstone is not None:
|
|
self.assertEqual(ts2_tombstone[0], b"FALSE")
|
|
|
|
self.assert_tombstoned(ts2, tombstoned=False)
|
|
|
|
r = self.dns_update_record(ts6, ts6)
|
|
|
|
# put some records into the death zone.
|
|
self.ldap_modify_timestamps(ts1, -15 * 24)
|
|
self.ldap_modify_timestamps(ts2, -14 * 24 - 2)
|
|
self.ldap_modify_timestamps(ts6, -14 * 24 + 2)
|
|
|
|
# ts1 will be saved by this record
|
|
self.dns_update_record(ts1, "another record")
|
|
|
|
try:
|
|
# Tell the server to clean-up records.
|
|
# This is how it *should* work on Windows:
|
|
self.rpc_conn.DnssrvOperation2(
|
|
dnsserver.DNS_CLIENT_VERSION_LONGHORN,
|
|
0,
|
|
SERVER_IP,
|
|
None,
|
|
0,
|
|
"StartScavenging",
|
|
dnsserver.DNSSRV_TYPEID_NULL,
|
|
None)
|
|
# Samba won't get here (NOT_IMPLEMENTED error)
|
|
# wait for Windows to do its cleanup.
|
|
time.sleep(2)
|
|
except WERRORError as e:
|
|
if e.args[0] == werror.WERR_CALL_NOT_IMPLEMENTED:
|
|
# This is the Samba way, talking to the file directly,
|
|
# as if we were the server process. The direct
|
|
# connection is needed because the tombstoning search
|
|
# involves a magic system only filter.
|
|
file_samdb = get_file_samdb()
|
|
dsdb._scavenge_dns_records(file_samdb)
|
|
dsdb._dns_delete_tombstones(file_samdb)
|
|
else:
|
|
raise
|
|
|
|
# Now what we should have:
|
|
# ts1: alive: the old record is deleted, the new one not.
|
|
# ts2: tombstoned
|
|
# ts3: tombstoned
|
|
# ts4: deleted. gone.
|
|
# ts5: deleted. timestamp affects tombstoning, but not deletion.
|
|
# ts6: alive
|
|
#
|
|
# We order our assertions to make the windows test
|
|
# fail as late as possible (on ts4, ts5, ts2).
|
|
r = self.get_unique_txt_record(ts1, ["another record"])
|
|
self.assertIsNotNone(r)
|
|
r = self.get_unique_txt_record(ts6, [ts6])
|
|
self.assertIsNotNone(r)
|
|
|
|
self.assert_tombstoned(ts3)
|
|
|
|
n = self.get_one_node(ts4)
|
|
self.assertIsNone(n)
|
|
n = self.get_one_node(ts5)
|
|
self.assertIsNone(n)
|
|
|
|
self.assert_tombstoned(ts2)
|
|
|
|
def test_samba_scavenging(self):
|
|
# We expect this one to fail on Windows, because scavenging
|
|
# and tombstoning cannot be performed on demand.
|
|
|
|
try:
|
|
file_samdb = get_file_samdb()
|
|
except ldb.LdbError as e:
|
|
raise AssertionError(
|
|
f"failing because '{e}': this is Windows?") from None
|
|
|
|
# let's try different limits.
|
|
self.set_zone_int_params(NoRefreshInterval=30,
|
|
RefreshInterval=20,
|
|
Aging=1)
|
|
|
|
now = dsdb_dns.unix_to_dns_timestamp(int(time.time()))
|
|
|
|
A, B, C, D = 'ABCD'
|
|
# A has current time
|
|
# B has safe, non-updateable time
|
|
# C has safe time
|
|
# D is scavengeable
|
|
atime = self.dns_update_record(A, A).dwTimeStamp
|
|
btime = self.ldap_update_record(B, B, dwTimeStamp=now-20).dwTimeStamp
|
|
btime = self.ldap_update_record(C, C, dwTimeStamp=now-40).dwTimeStamp
|
|
dtime = self.ldap_update_record(D, D, dwTimeStamp=now-60).dwTimeStamp
|
|
self.assert_soon_after(atime, now)
|
|
self.assert_timestamps_equal(btime, now-20)
|
|
self.assert_timestamps_equal(ctime, now-40)
|
|
self.assert_timestamps_equal(dtime, now-60)
|
|
|
|
dsdb._scavenge_dns_records(file_samdb)
|
|
|
|
# D should be gone (tombstoned)
|
|
r = self.get_unique_txt_record(D, D)
|
|
self.assertIsNone(r)
|
|
r = dns_query(self, D, qtype=dns.DNS_QTYPE_TXT)
|
|
self.assertEqual(r.ancount, 0)
|
|
recs = self.ldap_get_records(D)
|
|
self.assertEqual(len(recs), 1)
|
|
self.assert_tombstoned(recs[0])
|
|
|
|
# others unchanged.
|
|
atime = self.get_unique_txt_record(A, A).dwTimeStamp
|
|
btime = self.get_unique_txt_record(B, B).dwTimeStamp
|
|
ctime = self.get_unique_txt_record(C, C).dwTimeStamp
|
|
self.assert_soon_after(atime, now)
|
|
self.assert_timestamps_equal(btime, now-20)
|
|
self.assert_timestamps_equal(ctime, now-40)
|
|
|
|
btime = self.dns_update_record(B, B).dwTimeStamp
|
|
ctime = self.dns_update_record(C, C).dwTimeStamp
|
|
self.assert_timestamps_equal(btime, now-40)
|
|
self.assert_soon_after(ctime, now)
|
|
|
|
# after this, D *should* still be a tombstone, because its
|
|
# tombstone timestamp is not very old.
|
|
dsdb._dns_delete_tombstones(file_samdb)
|
|
recs = self.ldap_get_records(D)
|
|
self.assertEqual(len(recs), 1)
|
|
self.assert_tombstoned(recs[0])
|
|
|
|
# Let's delete C using rpc, and ensure it survives dns_delete_tombstones
|
|
self.rpc_delete_txt(C, C)
|
|
recs = self.ldap_get_records(C)
|
|
self.assertEqual(len(recs), 1)
|
|
self.assert_tombstoned(recs[0])
|
|
dsdb._dns_delete_tombstones(file_samdb)
|
|
recs = self.ldap_get_records(C)
|
|
self.assertEqual(len(recs), 1)
|
|
self.assert_tombstoned(recs[0])
|
|
|
|
# now let's wind A and B back to either side of the two week
|
|
# threshold. A should survive, B should not.
|
|
self.dns_tombstone(A, (now - 166))
|
|
self.dns_tombstone(B, (now - 170))
|
|
dsdb._dns_delete_tombstones(file_samdb)
|
|
|
|
recs = self.ldap_get_records(A)
|
|
self.assertEqual(len(recs), 1)
|
|
self.assert_tombstoned(recs[0])
|
|
|
|
recs = self.ldap_get_records(B)
|
|
self.assertEqual(len(recs), 0)
|
|
|
|
def _test_A_and_AAAA_records(self, A, B, a_days, b_days, aging):
|
|
self.set_aging(aging)
|
|
|
|
name = 'aargh'
|
|
now = dsdb_dns.unix_to_dns_timestamp(int(time.time()))
|
|
a_initial = now - 24 * a_days
|
|
b_initial = now - 24 * b_days
|
|
|
|
self.dns_update_non_text(name, A)
|
|
self.ldap_modify_timestamps(name, a_days * -24)
|
|
|
|
rec_a = self.get_unique_ip_record(name, A)
|
|
rec_b = self.add_ip_record(name, B, dwTimeStamp=b_initial)
|
|
|
|
self.assert_timestamps_equal(rec_a, a_initial)
|
|
self.assert_timestamps_equal(rec_b, b_initial)
|
|
|
|
# touch the A record.
|
|
self.dns_update_non_text(name, A)
|
|
|
|
# check the A timestamp, depending on norefresh
|
|
rec_a = self.get_unique_ip_record(name, A)
|
|
if aging and a_days > 7:
|
|
time_a = now
|
|
self.assert_soon_after(rec_a, now)
|
|
elif a_days > 7:
|
|
# when we have NO aging and are in the refresh window, the
|
|
# timestamp now reads as a_initial, but will become now
|
|
# after we manipulate B for a bit.
|
|
time_a = now
|
|
self.assert_timestamps_equal(rec_a, a_initial)
|
|
else:
|
|
time_a = a_initial
|
|
self.assert_timestamps_equal(rec_a, a_initial)
|
|
|
|
# B timestamp should be unchanged?
|
|
rec_b = self.get_unique_ip_record(name, B)
|
|
self.assert_timestamps_equal(rec_b, b_initial)
|
|
|
|
# touch the B record.
|
|
self.dns_update_non_text(name, B)
|
|
|
|
# check the B timestamp
|
|
rec_b = self.get_unique_ip_record(name, B)
|
|
if not aging:
|
|
self.windows_variation(
|
|
self.assert_soon_after, rec_b, now,
|
|
msg="windows updates non-aging, samba does not")
|
|
else:
|
|
self.assert_soon_after(rec_b, now)
|
|
|
|
# rewind B
|
|
rec_b = self.add_ip_record(name, B, dwTimeStamp=b_initial)
|
|
|
|
# NOW rec A might have changed! with no aging, and out of refresh.
|
|
rec_a = self.get_unique_ip_record(name, A)
|
|
self.assert_timestamps_equal(rec_a, time_a)
|
|
|
|
self.dns_update_non_text(name, A)
|
|
|
|
rec_a = self.get_unique_ip_record(name, B)
|
|
self.assert_timestamps_equal(rec_b, b_initial)
|
|
|
|
# now delete A
|
|
_, wtype = guess_wtype(A)
|
|
self.ldap_delete_record(name, A, wtype=wtype)
|
|
|
|
# re-add it
|
|
self.dns_update_non_text(name, A)
|
|
|
|
rec_a = self.get_unique_ip_record(name, A)
|
|
self.assert_soon_after(rec_a, now)
|
|
|
|
rec_b = self.get_unique_ip_record(name, B)
|
|
self.assert_timestamps_equal(rec_b, b_initial)
|
|
|
|
def test_A_5_days_AAAA_5_days_aging(self):
|
|
self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 5, 5, aging=True)
|
|
|
|
def test_A_5_days_AAAA_5_days_no_aging(self):
|
|
self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 5, 5, aging=False)
|
|
|
|
def test_A_5_days_AAAA_10_days_aging(self):
|
|
self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 5, 10, aging=True)
|
|
|
|
def test_A_5_days_AAAA_10_days_no_aging(self):
|
|
self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 5, 10, aging=False)
|
|
|
|
def test_A_10_days_AAAA_5_days_aging(self):
|
|
self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 10, 5, aging=True)
|
|
|
|
def test_A_10_days_AAAA_5_days_no_aging(self):
|
|
self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 10, 5, aging=False)
|
|
|
|
def test_A_10_days_AAAA_9_days_aging(self):
|
|
self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 10, 9, aging=True)
|
|
|
|
def test_A_9_days_AAAA_10_days_no_aging(self):
|
|
self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 9, 10, aging=False)
|
|
|
|
def test_A_20_days_AAAA_2_days_aging(self):
|
|
self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 20, 2, aging=True)
|
|
|
|
def test_A_6_days_AAAA_40_days_no_aging(self):
|
|
self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 6, 40, aging=False)
|
|
|
|
def test_A_5_days_A_5_days_aging(self):
|
|
self._test_A_and_AAAA_records(IPv4_ADDR, IPv4_ADDR_2, 5, 5, aging=True)
|
|
|
|
def test_A_5_days_A_10_days_no_aging(self):
|
|
self._test_A_and_AAAA_records(IPv4_ADDR, IPv4_ADDR_2, 5, 10, aging=False)
|
|
|
|
def test_AAAA_5_days_AAAA_6_days_aging(self):
|
|
self._test_A_and_AAAA_records(IPv6_ADDR, IPv6_ADDR_2, 5, 6, aging=True)
|
|
|
|
def test_AAAA_5_days_AAAA_6_days_no_aging(self):
|
|
self._test_A_and_AAAA_records(IPv6_ADDR, IPv6_ADDR_2, 5, 6, aging=False)
|
|
|
|
def _test_multi_records_delete(self, aging):
|
|
# Batch deleting a type doesn't update other types timestamps.
|
|
self.set_aging(aging)
|
|
|
|
name = 'aargh'
|
|
now = dsdb_dns.unix_to_dns_timestamp(int(time.time()))
|
|
|
|
back_5_days = now - 5 * 24
|
|
back_10_days = now - 10 * 24
|
|
back_25_days = now - 25 * 24
|
|
|
|
ip4s = {
|
|
'1.1.1.1': now,
|
|
'2.2.2.2': back_5_days,
|
|
'3.3.3.3': back_10_days,
|
|
}
|
|
ip6s = {
|
|
'::1': now,
|
|
'::2': back_5_days,
|
|
'::3': back_25_days,
|
|
}
|
|
|
|
txts = {
|
|
'1': now,
|
|
'2': back_5_days,
|
|
'3': back_25_days,
|
|
}
|
|
|
|
# For windows, if we don't DNS update something, it won't know
|
|
# there's anything.
|
|
self.dns_update_record(name, '3')
|
|
|
|
for k, v in ip4s.items():
|
|
r = self.add_ip_record(name, k, wtype=dns.DNS_QTYPE_A, dwTimeStamp=v)
|
|
|
|
for k, v in ip6s.items():
|
|
r = self.add_ip_record(name, k, wtype=dns.DNS_QTYPE_AAAA, dwTimeStamp=v)
|
|
|
|
for k, v in txts.items():
|
|
r = self.ldap_update_record(name, k, dwTimeStamp=v)
|
|
|
|
self.dns_delete_type(name, dnsp.DNS_TYPE_A)
|
|
|
|
r = self.dns_query(name, dns.DNS_QTYPE_A)
|
|
self.assertEqual(r.ancount, 0)
|
|
|
|
r = self.dns_query(name, dns.DNS_QTYPE_TXT)
|
|
self.assertEqual(r.ancount, 3)
|
|
rset = set(x.rdata.txt.str[0] for x in r.answers)
|
|
self.assertEqual(rset, set(txts))
|
|
|
|
r = self.dns_query(name, dns.DNS_QTYPE_AAAA)
|
|
self.assertEqual(r.ancount, 3)
|
|
rset = set(ipv6_normalise(x.rdata) for x in r.answers)
|
|
self.assertEqual(rset, set(ip6s))
|
|
|
|
recs = self.ldap_get_records(name)
|
|
self.assertEqual(len(recs), 6)
|
|
for r in recs:
|
|
if r.wType == dns.DNS_QTYPE_AAAA:
|
|
k = ipv6_normalise(r.data)
|
|
expected = ip6s[k]
|
|
elif r.wType == dns.DNS_QTYPE_TXT:
|
|
k = r.data.str[0]
|
|
expected = txts[k]
|
|
else:
|
|
self.fail(f"unexpected wType {r.wType}")
|
|
|
|
self.assert_timestamps_equal(r.dwTimeStamp, expected)
|
|
|
|
def test_multi_records_delete_aging(self):
|
|
self._test_multi_records_delete(True)
|
|
|
|
def test_multi_records_delete_no_aging(self):
|
|
self._test_multi_records_delete(False)
|
|
|
|
def _test_dns_delete_times(self, n_days, aging=True):
|
|
# In these tests, Windows replaces the records with
|
|
# tombstones, while Samba just removes them. Both are
|
|
# reasonable approaches (there is no reanimation pathway for
|
|
# tombstones), but this means self.ldap_get_records() gets
|
|
# different numbers for each. So we use
|
|
# self.ldap_get_non_tombstoned_record().
|
|
name = 'test'
|
|
A = ['A']
|
|
B = ['B']
|
|
C = ['C']
|
|
D = ['D']
|
|
self.set_aging(aging)
|
|
now = dsdb_dns.unix_to_dns_timestamp(int(time.time()))
|
|
n_days_ago = max(now - n_days * 24, 0)
|
|
|
|
self.dns_update_record(name, A)
|
|
self.ldap_update_record(name, A, dwTimeStamp=n_days_ago)
|
|
self.ldap_update_record(name, B, dwTimeStamp=n_days_ago)
|
|
self.ldap_update_record(name, C, dwTimeStamp=n_days_ago)
|
|
self.dns_update_record(name, D)
|
|
r = self.dns_query(name, dns.DNS_QTYPE_TXT)
|
|
rset = set(x.rdata.txt.str[0] for x in r.answers)
|
|
self.assertEqual(rset, set('ABCD'))
|
|
|
|
atime = self.get_unique_txt_record(name, A).dwTimeStamp
|
|
btime = self.get_unique_txt_record(name, B).dwTimeStamp
|
|
ctime = self.get_unique_txt_record(name, C).dwTimeStamp
|
|
dtime = self.get_unique_txt_record(name, D).dwTimeStamp
|
|
recs = self.ldap_get_records(name)
|
|
self.assertEqual(len(recs), 4)
|
|
r = self.dns_query(name, dns.DNS_QTYPE_TXT)
|
|
rset = set(x.rdata.txt.str[0] for x in r.answers)
|
|
self.assertEqual(rset, set('ABCD'))
|
|
|
|
self.dns_delete(name, D)
|
|
self.assert_timestamps_equal(atime, self.get_unique_txt_record(name, A))
|
|
self.assert_timestamps_equal(btime, self.get_unique_txt_record(name, B))
|
|
self.assert_timestamps_equal(ctime, self.get_unique_txt_record(name, C))
|
|
recs = self.ldap_get_non_tombstoned_records(name)
|
|
self.assertEqual(len(recs), 3)
|
|
r = self.dns_query(name, dns.DNS_QTYPE_TXT)
|
|
rset = set(x.rdata.txt.str[0] for x in r.answers)
|
|
self.assertEqual(rset, set('ABC'))
|
|
|
|
self.rpc_delete_txt(name, C)
|
|
self.assert_timestamps_equal(atime, self.get_unique_txt_record(name, A))
|
|
self.assert_timestamps_equal(btime, self.get_unique_txt_record(name, B))
|
|
recs = self.ldap_get_non_tombstoned_records(name)
|
|
self.assertEqual(len(recs), 2)
|
|
r = self.dns_query(name, dns.DNS_QTYPE_TXT)
|
|
rset = set(x.rdata.txt.str[0] for x in r.answers)
|
|
self.assertEqual(rset, set('AB'))
|
|
|
|
self.dns_delete(name, A)
|
|
self.assert_timestamps_equal(btime, self.get_unique_txt_record(name, B))
|
|
recs = self.ldap_get_records(name)
|
|
self.assertEqual(len(recs), 1)
|
|
r = self.dns_query(name, dns.DNS_QTYPE_TXT)
|
|
rset = set(x.rdata.txt.str[0] for x in r.answers)
|
|
self.assertEqual(rset, {'B'})
|
|
|
|
self.dns_delete(name, B)
|
|
recs = self.ldap_get_non_tombstoned_records(name)
|
|
# Windows leaves the node with zero records. Samba ends up
|
|
# with a tombstone.
|
|
self.assertEqual(len(recs), 0)
|
|
r = self.dns_query(name, dns.DNS_QTYPE_TXT)
|
|
rset = set(x.rdata.txt.str[0] for x in r.answers)
|
|
self.assertEqual(len(rset), 0)
|
|
|
|
def test_dns_delete_times_5_days_aging(self):
|
|
self._test_dns_delete_times(5, True)
|
|
|
|
def test_dns_delete_times_11_days_aging(self):
|
|
self._test_dns_delete_times(11, True)
|
|
|
|
def test_dns_delete_times_366_days_aging(self):
|
|
self._test_dns_delete_times(366, True)
|
|
|
|
def test_dns_delete_times_static_aging(self):
|
|
self._test_dns_delete_times(1e10, True)
|
|
|
|
def test_dns_delete_times_5_days_no_aging(self):
|
|
self._test_dns_delete_times(5, False)
|
|
|
|
def test_dns_delete_times_11_days_no_aging(self):
|
|
self._test_dns_delete_times(11, False)
|
|
|
|
def test_dns_delete_times_366_days_no_aging(self):
|
|
self._test_dns_delete_times(366, False)
|
|
|
|
def test_dns_delete_times_static_no_aging(self):
|
|
self._test_dns_delete_times(1e10, False)
|
|
|
|
def _test_dns_delete_simple(self, a_days, b_days, aging=True, touch=False):
|
|
# Here we show that with aging enabled, the timestamp of
|
|
# sibling records is *not* modified when a record is deleted.
|
|
#
|
|
# With aging disabled, it *is* modified, if the dns server has
|
|
# seen it updated before ldap set the time (that is, probably
|
|
# the dns server overwrites AD). This happens even if AD
|
|
# thinks the record is static.
|
|
name = 'test'
|
|
A = ['A']
|
|
B = ['B']
|
|
self.set_aging(aging)
|
|
now = dsdb_dns.unix_to_dns_timestamp(int(time.time()))
|
|
a_days_ago = max(now - a_days * 24, 0)
|
|
b_days_ago = max(now - b_days * 24, 0)
|
|
|
|
if touch:
|
|
self.dns_update_record(name, A)
|
|
self.dns_update_record(name, B)
|
|
|
|
self.ldap_update_record(name, A, dwTimeStamp=a_days_ago)
|
|
self.ldap_update_record(name, B, dwTimeStamp=b_days_ago)
|
|
|
|
atime = self.get_unique_txt_record(name, A).dwTimeStamp
|
|
|
|
self.dns_delete(name, B)
|
|
if not aging and touch:
|
|
# this resets the timestamp even if it is a static record.
|
|
self.assert_soon_after(self.get_unique_txt_record(name, A), now)
|
|
else:
|
|
self.assert_timestamps_equal(self.get_unique_txt_record(name, A), atime)
|
|
|
|
def test_dns_delete_simple_2_3_days_aging(self):
|
|
self._test_dns_delete_simple(2, 3, True)
|
|
|
|
def test_dns_delete_simple_2_3_days_no_aging(self):
|
|
self._test_dns_delete_simple(2, 3, False)
|
|
|
|
def test_dns_delete_simple_2_13_days_aging(self):
|
|
self._test_dns_delete_simple(2, 13, True)
|
|
|
|
def test_dns_delete_simple_2_13_days_no_aging(self):
|
|
self._test_dns_delete_simple(2, 13, False)
|
|
|
|
def test_dns_delete_simple_12_13_days_aging(self):
|
|
self._test_dns_delete_simple(12, 13, True)
|
|
|
|
def test_dns_delete_simple_12_13_days_no_aging(self):
|
|
self._test_dns_delete_simple(12, 13, False)
|
|
|
|
def test_dns_delete_simple_112_113_days_aging(self):
|
|
self._test_dns_delete_simple(112, 113, True)
|
|
|
|
def test_dns_delete_simple_112_113_days_no_aging(self):
|
|
self._test_dns_delete_simple(112, 113, False)
|
|
|
|
def test_dns_delete_simple_112_113_days_aging(self):
|
|
self._test_dns_delete_simple(112, 113, True)
|
|
|
|
def test_dns_delete_simple_112_113_days_no_aging(self):
|
|
self._test_dns_delete_simple(112, 113, False)
|
|
|
|
def test_dns_delete_simple_0_113_days_aging(self):
|
|
# 1e9 hours ago evaluates to 0, i.e static
|
|
self._test_dns_delete_simple(1e9, 113, True)
|
|
|
|
def test_dns_delete_simple_0_113_days_no_aging(self):
|
|
self._test_dns_delete_simple(1e9, 113, False)
|
|
|
|
def test_dns_delete_simple_0_0_days_aging(self):
|
|
self._test_dns_delete_simple(1e9, 1e9, True)
|
|
|
|
def test_dns_delete_simple_0_0_days_no_aging(self):
|
|
self._test_dns_delete_simple(1e9, 1e9, False)
|
|
|
|
def test_dns_delete_simple_10_0_days_aging(self):
|
|
self._test_dns_delete_simple(10, 1e9, True)
|
|
|
|
def test_dns_delete_simple_10_0_days_no_aging(self):
|
|
self._test_dns_delete_simple(10, 1e9, False)
|
|
|
|
def test_dns_delete_simple_2_3_days_aging_touch(self):
|
|
self._test_dns_delete_simple(2, 3, True, True)
|
|
|
|
def test_dns_delete_simple_2_3_days_no_aging_touch(self):
|
|
self._test_dns_delete_simple(2, 3, False, True)
|
|
|
|
def test_dns_delete_simple_2_13_days_aging_touch(self):
|
|
self._test_dns_delete_simple(2, 13, True, True)
|
|
|
|
def test_dns_delete_simple_2_13_days_no_aging_touch(self):
|
|
self._test_dns_delete_simple(2, 13, False, True)
|
|
|
|
def test_dns_delete_simple_12_13_days_aging_touch(self):
|
|
self._test_dns_delete_simple(12, 13, True, True)
|
|
|
|
def test_dns_delete_simple_12_13_days_no_aging_touch(self):
|
|
self._test_dns_delete_simple(12, 13, False, True)
|
|
|
|
def test_dns_delete_simple_112_113_days_aging_touch(self):
|
|
self._test_dns_delete_simple(112, 113, True, True)
|
|
|
|
def test_dns_delete_simple_112_113_days_no_aging_touch(self):
|
|
self._test_dns_delete_simple(112, 113, False, True)
|
|
|
|
def test_dns_delete_simple_112_113_days_aging_touch(self):
|
|
self._test_dns_delete_simple(112, 113, True, True)
|
|
|
|
def test_dns_delete_simple_112_113_days_no_aging_touch(self):
|
|
self._test_dns_delete_simple(112, 113, False, True)
|
|
|
|
def test_dns_delete_simple_0_113_days_aging_touch(self):
|
|
# 1e9 hours ago evaluates to 0, i.e static
|
|
self._test_dns_delete_simple(1e9, 113, True, True)
|
|
|
|
def test_dns_delete_simple_0_113_days_no_aging_touch(self):
|
|
self._test_dns_delete_simple(1e9, 113, False, True)
|
|
|
|
def test_dns_delete_simple_0_0_days_aging_touch(self):
|
|
self._test_dns_delete_simple(1e9, 1e9, True, True)
|
|
|
|
def test_dns_delete_simple_0_0_days_no_aging_touch(self):
|
|
self._test_dns_delete_simple(1e9, 1e9, False, True)
|
|
|
|
def test_dns_delete_simple_10_0_days_aging_touch(self):
|
|
self._test_dns_delete_simple(10, 1e9, True, True)
|
|
|
|
def test_dns_delete_simple_10_0_days_no_aging_touch(self):
|
|
self._test_dns_delete_simple(10, 1e9, False, True)
|
|
|
|
def windows_variation(self, fn, *args, msg=None, **kwargs):
|
|
try:
|
|
fn(*args, **kwargs)
|
|
except AssertionError as e:
|
|
print("Expected success on Windows only, failed as expected:\n" +
|
|
c_GREEN(e))
|
|
return
|
|
print(c_RED("known Windows failure"))
|
|
if msg is not None:
|
|
print(c_DARK_YELLOW(msg))
|
|
print("Expected success on Windows:\n" +
|
|
c_GREEN(f"{fn.__name__} {args} {kwargs}"))
|
|
|
|
def _test_dns_add_sibling(self, a_days, refresh, aging=True, touch=False):
|
|
# Here we show that with aging enabled, the timestamp of
|
|
# sibling records *is* modified when a record is added.
|
|
#
|
|
# With aging disabled, it *is* modified, if the dns server has
|
|
# seen it updated before ldap set the time (that is, probably
|
|
# the dns server overwrites AD). This happens even if AD
|
|
# thinks the record is static.
|
|
name = 'test'
|
|
A = ['A']
|
|
B = ['B']
|
|
self.set_zone_int_params(RefreshInterval=int(refresh),
|
|
NoRefreshInterval=7,
|
|
Aging=int(aging))
|
|
|
|
now = dsdb_dns.unix_to_dns_timestamp(int(time.time()))
|
|
a_days_ago = max(now - a_days * 24, 0)
|
|
|
|
if touch:
|
|
self.dns_update_record(name, A)
|
|
|
|
self.ldap_update_record(name, A, dwTimeStamp=a_days_ago)
|
|
|
|
atime = self.get_unique_txt_record(name, A).dwTimeStamp
|
|
|
|
self.dns_update_record(name, B)
|
|
a_rec = self.get_unique_txt_record(name, A)
|
|
if not aging and touch:
|
|
# On Windows, this resets the timestamp even if it is a
|
|
# static record, though in that case it may be a
|
|
# transitory effect of the DNS cache. We will insist on
|
|
# the Samba behaviour of not changing (that is
|
|
# un-static-ing) a zero timestamp, because that is the
|
|
# sensible thing.
|
|
if a_days_ago == 0:
|
|
self.windows_variation(
|
|
self.assert_soon_after, a_rec, now,
|
|
msg="Windows resets static siblings (cache effect?)")
|
|
self.assert_timestamps_equal(a_rec, 0)
|
|
else:
|
|
self.assert_soon_after(a_rec, now)
|
|
else:
|
|
self.assert_timestamps_equal(a_rec, atime)
|
|
|
|
b_rec = self.get_unique_txt_record(name, B)
|
|
self.assert_soon_after(b_rec, now)
|
|
|
|
def test_dns_add_sibling_2_7_days_aging(self):
|
|
self._test_dns_add_sibling(2, 7, True)
|
|
|
|
def test_dns_add_sibling_2_7_days_no_aging(self):
|
|
self._test_dns_add_sibling(2, 7, False)
|
|
|
|
def test_dns_add_sibling_12_7_days_aging(self):
|
|
self._test_dns_add_sibling(12, 7, True)
|
|
|
|
def test_dns_add_sibling_12_7_days_no_aging(self):
|
|
self._test_dns_add_sibling(12, 7, False)
|
|
|
|
def test_dns_add_sibling_12_3_days_aging(self):
|
|
self._test_dns_add_sibling(12, 3, True)
|
|
|
|
def test_dns_add_sibling_12_3_days_no_aging(self):
|
|
self._test_dns_add_sibling(12, 3, False)
|
|
|
|
def test_dns_add_sibling_112_7_days_aging(self):
|
|
self._test_dns_add_sibling(112, 7, True)
|
|
|
|
def test_dns_add_sibling_112_7_days_no_aging(self):
|
|
self._test_dns_add_sibling(112, 7, False)
|
|
|
|
def test_dns_add_sibling_12_113_days_aging(self):
|
|
self._test_dns_add_sibling(12, 113, True)
|
|
|
|
def test_dns_add_sibling_12_113_days_no_aging(self):
|
|
self._test_dns_add_sibling(12, 113, False)
|
|
|
|
def test_dns_add_sibling_0_7_days_aging(self):
|
|
# 1e9 days ago evaluates to 0, i.e static
|
|
self._test_dns_add_sibling(1e9, 7, True)
|
|
|
|
def test_dns_add_sibling_0_7_days_no_aging(self):
|
|
self._test_dns_add_sibling(1e9, 7, False)
|
|
|
|
def test_dns_add_sibling_0_0_days_aging(self):
|
|
self._test_dns_add_sibling(1e9, 0, True)
|
|
|
|
def test_dns_add_sibling_0_0_days_no_aging(self):
|
|
self._test_dns_add_sibling(1e9, 0, False)
|
|
|
|
def test_dns_add_sibling_10_0_days_aging(self):
|
|
self._test_dns_add_sibling(10, 0, True)
|
|
|
|
def test_dns_add_sibling_10_0_days_no_aging(self):
|
|
self._test_dns_add_sibling(10, 0, False)
|
|
|
|
def test_dns_add_sibling_2_7_days_aging_touch(self):
|
|
self._test_dns_add_sibling(2, 7, True, True)
|
|
|
|
def test_dns_add_sibling_2_7_days_no_aging_touch(self):
|
|
self._test_dns_add_sibling(2, 7, False, True)
|
|
|
|
def test_dns_add_sibling_12_7_days_aging_touch(self):
|
|
self._test_dns_add_sibling(12, 7, True, True)
|
|
|
|
def test_dns_add_sibling_12_7_days_no_aging_touch(self):
|
|
self._test_dns_add_sibling(12, 7, False, True)
|
|
|
|
def test_dns_add_sibling_12_3_days_aging_touch(self):
|
|
self._test_dns_add_sibling(12, 3, True, True)
|
|
|
|
def test_dns_add_sibling_12_3_days_no_aging_touch(self):
|
|
self._test_dns_add_sibling(12, 3, False, True)
|
|
|
|
def test_dns_add_sibling_112_7_days_aging_touch(self):
|
|
self._test_dns_add_sibling(112, 7, True, True)
|
|
|
|
def test_dns_add_sibling_112_7_days_no_aging_touch(self):
|
|
self._test_dns_add_sibling(112, 7, False, True)
|
|
|
|
def test_dns_add_sibling_12_113_days_aging_touch(self):
|
|
self._test_dns_add_sibling(12, 113, True, True)
|
|
|
|
def test_dns_add_sibling_12_113_days_no_aging_touch(self):
|
|
self._test_dns_add_sibling(12, 113, False, True)
|
|
|
|
def test_dns_add_sibling_0_7_days_aging_touch(self):
|
|
self._test_dns_add_sibling(1e9, 7, True, True)
|
|
|
|
def test_dns_add_sibling_0_7_days_no_aging_touch(self):
|
|
self._test_dns_add_sibling(1e9, 7, False, True)
|
|
|
|
def test_dns_add_sibling_0_0_days_aging_touch(self):
|
|
self._test_dns_add_sibling(1e9, 0, True, True)
|
|
|
|
def test_dns_add_sibling_0_0_days_no_aging_touch(self):
|
|
self._test_dns_add_sibling(1e9, 0, False, True)
|
|
|
|
def test_dns_add_sibling_10_0_days_aging_touch(self):
|
|
self._test_dns_add_sibling(10, 0, True, True)
|
|
|
|
def test_dns_add_sibling_10_0_days_no_aging_touch(self):
|
|
self._test_dns_add_sibling(10, 0, False, True)
|
|
|
|
TestProgram(module=__name__, opts=subunitopts)
|