1
0
mirror of https://github.com/samba-team/samba.git synced 2024-12-23 17:34:34 +03:00
samba-mirror/python/samba/tests/dns_aging.py
Douglas Bagnall 0423b0b884 pytest: dns_aging: use assert_timestamps_equal() widely
Signed-off-by: Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
2021-06-20 23:26:32 +00:00

2170 lines
85 KiB
Python

# Unix SMB/CIFS implementation.
# Copyright (C) Kai Blin <kai@samba.org> 2011
# Copyright (C) Catalyst.NET 2021
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import sys
from samba import dsdb
from samba import dsdb_dns
from samba.ndr import ndr_unpack, ndr_pack
from samba.samdb import SamDB
from samba.auth import system_session
import ldb
from samba import credentials
from samba.dcerpc import dns, dnsp, dnsserver
from samba.dnsserver import TXTRecord
from samba.dnsserver import recbuf_from_string
from samba.tests.subunitrun import SubunitOptions, TestProgram
from samba import werror, WERRORError
from samba.tests.dns_base import DNSTest
import samba.getopt as options
import optparse
import time
parser = optparse.OptionParser(
"dns_aging.py <server name> <server ip> [options]")
sambaopts = options.SambaOptions(parser)
parser.add_option_group(sambaopts)
# use command line creds if available
credopts = options.CredentialsOptions(parser)
parser.add_option_group(credopts)
subunitopts = SubunitOptions(parser)
parser.add_option_group(subunitopts)
opts, args = parser.parse_args()
if len(args) < 2:
parser.print_usage()
sys.exit(1)
LP = sambaopts.get_loadparm()
CREDS = credopts.get_credentials(LP)
SERVER_NAME = args[0]
SERVER_IP = args[1]
CREDS.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE)
DOMAIN = CREDS.get_realm().lower()
# Unix time start, in DNS timestamp (24 * 365.25 * 369)
# These are ballpark extremes for the timestamp.
DNS_TIMESTAMP_1970 = 3234654
DNS_TIMESTAMP_2101 = 4383000
DNS_TIMESTAMP_1981 = 3333333 # a middling timestamp
IPv4_ADDR = "127.0.0.33"
IPv6_ADDR = "::1"
IPv4_ADDR_2 = "127.0.0.66"
IPv6_ADDR_2 = "1::1"
def get_samdb():
return SamDB(url=f"ldap://{SERVER_IP}",
lp=LP,
session_info=system_session(),
credentials=CREDS)
def get_file_samdb():
# For Samba only direct file access, needed for the tombstoning functions.
# (For Windows, we instruct it to tombstone over RPC).
return SamDB(url=LP.samdb_url(),
lp=LP,
session_info=system_session(),
credentials=CREDS)
def get_rpc():
return dnsserver.dnsserver(f"ncacn_ip_tcp:{SERVER_IP}[sign]", LP, CREDS)
def create_zone(name, rpc=None, aging=True):
if rpc is None:
rpc = get_rpc()
z = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
z.pszZoneName = name
z.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
z.fAging = int(bool(aging))
z.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
z.fDsIntegrated = 1
z.fLoadExisting = 1
z.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE
rpc.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0,
SERVER_IP,
None,
0,
'ZoneCreate',
dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
z)
def delete_zone(name, rpc=None):
if rpc is None:
rpc = get_rpc()
rpc.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0,
SERVER_IP,
name,
0,
'DeleteZoneFromDs',
dnsserver.DNSSRV_TYPEID_NULL,
None)
def txt_s_list(txt):
"""Construct a txt record string list, which is a fiddly matter."""
if isinstance(txt, str):
txt = [txt]
s_list = dnsp.string_list()
s_list.count = len(txt)
s_list.str = txt
return s_list
def make_txt_record(txt):
r = dns.txt_record()
r.txt = txt_s_list(txt)
return r
def copy_rec(rec):
copy = dnsserver.DNS_RPC_RECORD()
copy.wType = rec.wType
copy.dwFlags = rec.dwFlags
copy.dwSerial = rec.dwSerial
copy.dwTtlSeconds = rec.dwTtlSeconds
copy.data = rec.data
copy.dwTimeStamp = rec.dwTimeStamp
return copy
def guess_wtype(data):
if isinstance(data, list):
data = make_txt_record(data)
return (data, dnsp.DNS_TYPE_TXT)
if ":" in data:
return (data, dnsp.DNS_TYPE_AAAA)
return (data, dnsp.DNS_TYPE_A)
class TestDNSAging(DNSTest):
"""Probe DNS aging and scavenging, using LDAP and RPC to set and test
the timestamps behind DNS's back."""
server = SERVER_NAME
server_ip = SERVER_IP
creds = CREDS
def setUp(self):
super().setUp()
self.rpc_conn = get_rpc()
self.samdb = get_samdb()
# We always have a zone of our own named after the test function.
self.zone = self.id().rsplit('.', 1)[1]
self.addCleanup(delete_zone, self.zone, self.rpc_conn)
try:
create_zone(self.zone, self.rpc_conn)
except WERRORError as e:
if e.args[0] != werror.WERR_DNS_ERROR_ZONE_ALREADY_EXISTS:
raise
print(f"zone {self.zone} already exists")
# Though we set this in create_zone(), that doesn't work on
# Windows, so we repeat again here.
self.set_zone_int_params(AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
self.zone_dn = (f"DC={self.zone},CN=MicrosoftDNS,DC=DomainDNSZones,"
f"{self.samdb.get_default_basedn()}")
def set_zone_int_params(self, zone=None, **kwargs):
"""Keyword arguments set parameters on the zone. e.g.:
self.set_zone_int_params(Aging=1,
RefreshInterval=222)
See [MS-DNSP] 3.1.1.2.1 "DNS Zone Integer Properties" for names.
"""
if zone is None:
zone = self.zone
for key, val in kwargs.items():
name_param = dnsserver.DNS_RPC_NAME_AND_PARAM()
name_param.dwParam = val
name_param.pszNodeName = key
try:
self.rpc_conn.DnssrvOperation2(
dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0,
SERVER_IP,
zone,
0,
'ResetDwordProperty',
dnsserver.DNSSRV_TYPEID_NAME_AND_PARAM,
name_param)
except WERRORError as e:
self.fail(str(e))
def rpc_replace(self, name, old=None, new=None):
"""Replace a DNS_RPC_RECORD or DNS_RPC_RECORD_BUF"""
# wrap our recs, if necessary
if isinstance(new, dnsserver.DNS_RPC_RECORD):
rec = new
new = dnsserver.DNS_RPC_RECORD_BUF()
new.rec = rec
if isinstance(old, dnsserver.DNS_RPC_RECORD):
rec = old
old = dnsserver.DNS_RPC_RECORD_BUF()
old.rec = rec
try:
self.rpc_conn.DnssrvUpdateRecord2(
dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0,
SERVER_IP,
self.zone,
name,
new,
old)
except WERRORError as e:
self.fail(f"could not replace record ({e})")
def rpc_add(self, name, data, wtype):
rec_buf = recbuf_from_string(wtype, data)
self.rpc_replace(name, None, rec_buf)
def rpc_delete(self, name, data, wtype):
rec_buf = recbuf_from_string(wtype, data)
self.rpc_replace(name, rec_buf, None)
def get_unique_txt_record(self, name, txt):
"""Get the TXT record on Name with value txt, asserting that there is
only one."""
if isinstance(txt, str):
txt = [txt]
recs = self.ldap_get_records(name)
match = None
for r in recs:
if r.wType != dnsp.DNS_TYPE_TXT:
continue
txt2 = [x for x in r.data.str]
if txt2 == txt:
self.assertIsNone(match)
match = r
return match
def get_unique_ip_record(self, name, addr, wtype=None):
"""Get an A or AAAA record on name with the matching data."""
if wtype is None:
addr, wtype = guess_wtype(addr)
recs = self.ldap_get_records(name)
# We need to use the internal dns_record_match because not all
# forms always match on strings (e.g. IPv6)
rec = dnsp.DnssrvRpcRecord()
rec.wType = wtype
rec.data = addr
match = None
for r in recs:
if dsdb_dns.records_match(r, rec):
self.assertIsNone(match)
match = r
return match
def dns_query(self, name, qtype=dns.DNS_QTYPE_ALL):
"""make a query, which might help Windows notice LDAP changes"""
p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
fullname = "%s.%s" % (name, self.zone)
q = self.make_name_question(fullname, qtype, dns.DNS_QCLASS_IN)
self.finish_name_packet(p, [q])
r, rp = self.dns_transaction_udp(p, host=SERVER_IP)
return r
def dns_update_non_text(self, name,
data,
wtype=None,
qclass=dns.DNS_QCLASS_IN):
if wtype is None:
data, wtype = guess_wtype(data)
if qclass == dns.DNS_QCLASS_IN:
ttl = 123
else:
ttl = 0
fullname = "%s.%s" % (name, self.zone)
p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
u = self.make_name_question(self.zone,
dns.DNS_QTYPE_SOA,
dns.DNS_QCLASS_IN)
self.finish_name_packet(p, [u])
r = dns.res_rec()
r.name = fullname
r.rr_type = wtype
r.rr_class = qclass
r.ttl = ttl
r.length = 0xffff
r.rdata = data
p.nscount = 1
p.nsrecs = [r]
(code, response) = self.dns_transaction_udp(p, host=SERVER_IP)
self.assert_dns_rcode_equals(code, dns.DNS_RCODE_OK)
return response
def dns_update_record(self, name, txt, ttl=900):
if isinstance(txt, str):
txt = [txt]
p = self.make_txt_update(name, txt, self.zone, ttl=ttl)
(code, response) = self.dns_transaction_udp(p, host=SERVER_IP)
self.assert_dns_rcode_equals(code, dns.DNS_RCODE_OK)
return self.get_unique_txt_record(name, txt)
def rpc_update_record(self, name, txt, **kwargs):
"""Add the record that self.dns_update_record() would add, via the
dnsserver RPC pipe.
As with DNS update, if the record already exists, we replace it.
"""
if isinstance(txt, str):
txt = [txt]
old = TXTRecord(txt)
rec = TXTRecord(txt)
for k, v in kwargs.items():
setattr(rec, k, v)
try:
self.rpc_replace(name, old, rec)
except AssertionError as e:
# we have caught and wrapped the WERRor inside
if 'WERR_DNS_ERROR_RECORD_DOES_NOT_EXIST' not in str(e):
raise
self.rpc_replace(name, None, rec)
return self.get_unique_txt_record(name, txt)
def get_one_node(self, name):
expr = f"(&(objectClass=dnsNode)(name={name}))"
nodes = self.samdb.search(base=self.zone_dn,
scope=ldb.SCOPE_SUBTREE,
expression=expr,
attrs=["dnsRecord", "dNSTombstoned", "name"])
if len(nodes) > 1:
self.fail(
f"expected 0 or 1 dnsNodes for {name}, found {len(nodes)}")
if len(nodes) == 0:
return None
return nodes[0]
def ldap_get_records(self, name):
node = self.get_one_node(name)
if node is None:
return []
records = node.get('dnsRecord')
return [ndr_unpack(dnsp.DnssrvRpcRecord, r) for r in records]
def assert_tombstoned(self, name, tombstoned=True, timestamp=None):
# If run with tombstoned=False, assert it isn't tombstoned
# (and has no traces of tombstone). Otherwise assert it has
# all the necessary bits.
node = self.get_one_node(name)
if node is None:
self.fail(f"no node named {name}")
dnsts = node.get("dNSTombstoned")
if dnsts is None:
is_tombstoned = False
else:
self.assertEqual(len(dnsts), 1)
if dnsts[0] == b'TRUE':
is_tombstoned = True
else:
is_tombstoned = False
if tombstoned != is_tombstoned:
if is_tombstoned:
self.fail(f"{name} is tombstoned")
else:
self.fail(f"{name} is not tombstoned")
recs = self.ldap_get_records(name)
if is_tombstoned:
self.assertEqual(len(recs), 1)
self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TOMBSTONE)
if timestamp is None:
self.assert_nttime_in_hour_range(recs[0].data)
else:
self.assert_nttime_in_hour_range(recs[0].data,
timestamp - 3,
timestamp + 3)
else:
for r in recs:
self.assertNotEqual(recs[0].wType, dnsp.DNS_TYPE_TOMBSTONE)
def ldap_replace_records(self, name, records):
# We use raw ldap to avoid the "helpfulness" of dsdb_dns.replace()
dn = f'DC={name},{self.zone_dn}'
msg = ldb.Message.from_dict(self.samdb,
{'dn': dn,
'dnsRecord': [ndr_pack(r) for r in records]
},
ldb.FLAG_MOD_REPLACE)
try:
self.samdb.modify(msg)
except ldb.LdbError as e:
if 'LDAP_NO_SUCH_OBJECT' not in e.args[1]:
raise
# We need to do an add
msg["objectClass"] = ["top", "dnsNode"]
msg["dnsRecord"].set_flags(ldb.FLAG_MOD_ADD)
self.samdb.add(msg)
def ldap_update_core(self, name, wtype, data, **kwargs):
"""This one is not TXT specific."""
records = self.ldap_get_records(name)
# default values
rec = dnsp.DnssrvRpcRecord()
rec.wType = wtype
rec.rank = dnsp.DNS_RANK_ZONE
rec.dwTtlSeconds = 900
rec.dwSerial = 110
rec.dwTimeStamp = 0
rec.data = data
# override defaults, as required
for k, v in kwargs.items():
setattr(rec, k, v)
for i, r in enumerate(records[:]):
if dsdb_dns.records_match(r, rec):
records[i] = rec
break
else: # record not found
records.append(rec)
self.ldap_replace_records(name, records)
return rec
def ldap_update_record(self, name, txt, **kwargs):
"""Add the record that self.dns_update_record() would add, via ldap,
thus allowing us to set additional dnsRecord features like
dwTimestamp.
"""
rec = self.ldap_update_core(name,
dnsp.DNS_TYPE_TXT,
txt_s_list(txt),
**kwargs)
recs = self.ldap_get_records(name)
match = None
for r in recs:
if r.wType != rec.wType:
continue
if r.data.str == rec.data.str:
self.assertIsNone(match, f"duplicate records for {name}")
match = r
self.assertEqual(match.rank, rec.rank & 255)
self.assertEqual(match.dwTtlSeconds, rec.dwTtlSeconds)
self.assert_timestamps_equal(match.dwTimeStamp, rec.dwTimeStamp)
return match
def ldap_delete_record(self, name, data, wtype=dnsp.DNS_TYPE_TXT):
rec = dnsp.DnssrvRpcRecord()
if wtype == dnsp.DNS_TYPE_TXT:
data = txt_s_list(data)
rec.wType = wtype
rec.data = data
records = self.ldap_get_records(name)
for i, r in enumerate(records[:]):
if dsdb_dns.records_match(r, rec):
del records[i]
break
else:
self.fail(f"record {data} not found")
self.ldap_replace_records(name, records)
def add_ip_record(self, name, addr, wtype=None, **kwargs):
if wtype is None:
addr, wtype = guess_wtype(addr)
rec = self.ldap_update_core(name,
wtype,
addr,
**kwargs)
recs = self.ldap_get_records(name)
match = None
for r in recs:
if dsdb_dns.records_match(r, rec):
self.assertIsNone(match, f"duplicate records for {name}")
match = r
self.assertEqual(match.rank, rec.rank & 255)
self.assertEqual(match.dwTtlSeconds, rec.dwTtlSeconds)
self.assert_timestamps_equal(match.dwTimeStamp, rec.dwTimeStamp)
return match
def ldap_modify_timestamps(self, name, delta):
records = self.ldap_get_records(name)
for rec in records:
rec.dwTimeStamp += delta
self.ldap_replace_records(name, records)
def get_rpc_records(self, name, dns_type=None):
if dns_type is None:
dns_type = dnsp.DNS_TYPE_ALL
select_flags = dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA
buflen, res = self.rpc_conn.DnssrvEnumRecords2(
dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0,
SERVER_IP,
self.zone,
name,
None,
dns_type,
select_flags,
None,
None)
recs = []
if not res or res.count == 0:
return []
for rec in res.rec:
recs.extend(rec.records)
return recs
def dns_tombstone(self, name,
epoch_hours=DNS_TIMESTAMP_1981,
epoch_nttime=None):
dn = f'DC={name},{self.zone_dn}'
r = dnsp.DnssrvRpcRecord()
r.wType = dnsp.DNS_TYPE_TOMBSTONE
# r.dwTimeStamp is a 32 bit value in hours, and r.data is an
# NTTIME (100 nanosecond intervals), both in the 1601 epoch. A
# tombstome will have both, but expiration calculations use
# the r.data NTTIME EntombedTime timestamp (see [MS-DNSP]).
r.dwTimeStamp = epoch_hours
if epoch_nttime is None:
r.data = epoch_hours * 3600 * 10 * 1000 * 1000
else:
r.data = epoch_nttime
msg = ldb.Message.from_dict(self.samdb,
{'dn': dn,
'dnsRecord': [ndr_pack(r)],
'dnsTombstoned': 'TRUE'
},
ldb.FLAG_MOD_REPLACE)
try:
self.samdb.modify(msg)
except ldb.LdbError as e:
if 'LDAP_NO_SUCH_OBJECT' not in e.args[1]:
raise
# We need to do an add
msg["objectClass"] = ["top", "dnsNode"]
self.samdb.add(msg)
def set_aging(self, enable=False):
self.set_zone_int_params(Aging=int(bool(enable)))
def assert_timestamp_in_ballpark(self, rec):
self.assertGreater(rec.dwTimeStamp, DNS_TIMESTAMP_1970)
self.assertLess(rec.dwTimeStamp, DNS_TIMESTAMP_2101)
def assert_nttime_in_hour_range(self, t,
hour_min=DNS_TIMESTAMP_1970,
hour_max=DNS_TIMESTAMP_2101):
t //= int(3600 * 1e7)
self.assertGreater(t, hour_min)
self.assertLess(t, hour_max)
def assert_soon_after(self, timestamp, reference):
"""Assert that a timestamp is the same or very slightly higher than a
reference timestamp.
Typically we expect the timestamps to be identical, unless an
hour has clicked over since the reference was taken. However
we allow one more hour in case it happens during a daylight
savings transition or something.
"""
if hasattr(timestamp, 'dwTimeStamp'):
timestamp = timestamp.dwTimeStamp
if hasattr(reference, 'dwTimeStamp'):
reference = reference.dwTimeStamp
diff = timestamp - reference
days = abs(diff / 24.0)
if diff < 0:
msg = f"timestamp is {days} days ({abs(diff)} hours) before reference"
elif diff > 2:
msg = f"timestamp is {days} days ({diff} hours) after reference"
else:
return
raise AssertionError(msg)
def assert_timestamps_equal(self, ts1, ts2):
"""Just like assertEqual(), but tells us the difference, not the
absolute values. e.g:
self.assertEqual(a, b)
AssertionError: 3685491 != 3685371
self.assert_timestamps_equal(a, b)
AssertionError: -120 (first is 5.0 days earlier than second)
Also, we turn a record into a timestamp if we need
"""
if hasattr(ts1, 'dwTimeStamp'):
ts1 = ts1.dwTimeStamp
if hasattr(ts2, 'dwTimeStamp'):
ts2 = ts2.dwTimeStamp
if ts1 == ts2:
return
diff = ts1 - ts2
days = abs(diff / 24.0)
if ts1 == 0 or ts2 == 0:
# when comparing to zero we don't want the number of days.
msg = f"timestamp {ts1} != {ts2}"
elif diff > 0:
msg = f"{ts1} is {days} days ({diff} hours) after {ts2}"
else:
msg = f"{ts1} is {days} days ({abs(diff)} hours) before {ts2}"
raise AssertionError(msg)
def test_update_timestamps_aging_off_then_on(self):
# we will add a record with aging off
# it will have the current timestamp
self.set_aging(False)
name = 'timestamp-now'
name2 = 'timestamp-eightdays'
rec = self.dns_update_record(name, [name])
start_time = rec.dwTimeStamp
self.assert_timestamp_in_ballpark(rec)
# alter the timestamp -8 days using RPC
# with aging turned off, we expect no change
# when aging is on, we expect change
eight_days_ago = start_time - 8 * 24
rec = self.ldap_update_record(name2, [name2],
dwTimeStamp=eight_days_ago)
self.assert_timestamps_equal(rec.dwTimeStamp, eight_days_ago)
# if aging was on, this would change
rec = self.dns_update_record(name2, [name2])
self.assert_timestamps_equal(rec.dwTimeStamp, eight_days_ago)
self.set_aging(True)
rec = self.dns_update_record(name2, [name2])
self.assertGreaterEqual(rec.dwTimeStamp, start_time)
def test_rpc_update_timestamps(self):
# RPC always sets timestamps to zero on Windows.
self.set_aging(False)
name = 'timestamp-now'
rec = self.dns_update_record(name, [name])
start_time = rec.dwTimeStamp
self.assert_timestamp_in_ballpark(rec)
# attempt to alter the timestamp to something close by.
eight_days_ago = start_time - 8 * 24
rec = self.rpc_update_record(name, [name],
dwTimeStamp=eight_days_ago)
self.assertEqual(rec.dwTimeStamp, 0)
# try again, with aging on
self.set_aging(True)
rec = self.rpc_update_record(name, [name],
dwTimeStamp=eight_days_ago)
self.assertEqual(rec.dwTimeStamp, 0)
# now that the record is static, a dns update won't change it
rec = self.dns_update_record(name, [name])
self.assertEqual(rec.dwTimeStamp, 0)
# but another record on the same node will behave normally
# i.e. the node is not static, the record is.
name2 = 'timestamp-eightdays'
rec = self.dns_update_record(name2, [name2])
self.assert_soon_after(rec.dwTimeStamp,
start_time)
def get_txt_timestamps(self, name, *txts):
records = self.ldap_get_records(name)
ret = []
for t in txts:
for r in records:
t2 = [x for x in r.data.str]
if t == t2:
ret.append(r.dwTimeStamp)
return ret
def test_update_aging_disabled_2(self):
# With aging disabled, Windows updates the timestamps of all
# records when one is updated.
name = 'test'
txt1 = ['test txt']
txt2 = ['test', 'txt2']
txt3 = ['test', 'txt3']
self.set_aging(False)
current_time = self.dns_update_record(name, txt1).dwTimeStamp
six_days_ago = current_time - 6 * 24
eight_days_ago = current_time - 8 * 24
fifteen_days_ago = current_time - 15 * 24
hundred_days_ago = current_time - 100 * 24
thousand_days_ago = current_time - 1000 * 24
for timestamp in (current_time,
six_days_ago,
eight_days_ago,
fifteen_days_ago,
hundred_days_ago,
thousand_days_ago):
# wind back
self.ldap_update_record(name, txt1, dwTimeStamp=timestamp)
self.assertEqual(self.get_txt_timestamps(name, txt1), [timestamp])
# no change here
update_timestamp = self.dns_update_record(name, txt1).dwTimeStamp
self.assert_timestamps_equal(update_timestamp, timestamp)
# adding a fresh record
for timestamp in (current_time,
six_days_ago,
eight_days_ago,
fifteen_days_ago,
hundred_days_ago,
thousand_days_ago,
100000,
100):
# wind back
timestamp1 = self.ldap_update_record(
name,
txt1,
dwTimeStamp=timestamp).dwTimeStamp
self.assert_timestamps_equal(timestamp1, timestamp)
self.dns_update_record(name, txt2)
timestamps = self.get_txt_timestamps(name, txt1, txt2)
self.assertEqual(timestamps, [timestamp, current_time])
self.ldap_delete_record(name, txt2)
timestamps = self.get_txt_timestamps(name, txt1)
self.assertEqual(timestamps, [timestamp])
# add record 2.
timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
self.assert_soon_after(timestamp2, current_time)
for timestamp in (current_time,
six_days_ago,
eight_days_ago,
fifteen_days_ago,
hundred_days_ago,
thousand_days_ago,
100000,
100):
# wind back
self.ldap_update_record(name, txt1, dwTimeStamp=timestamp)
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
self.assert_timestamps_equal(timestamp1, timestamp)
timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
# txt1 timestamp is now current time
timestamps = self.get_txt_timestamps(name, txt1, txt2)
self.assertEqual(timestamps, [timestamp, current_time])
# with 3 records, no change
for timestamp in (current_time,
six_days_ago,
eight_days_ago,
fifteen_days_ago,
hundred_days_ago,
thousand_days_ago,
100000,
10):
# wind back
self.ldap_update_record(name, txt1, dwTimeStamp=timestamp)
self.ldap_update_record(name, txt2, dwTimeStamp=timestamp)
self.ldap_update_record(name, txt3, dwTimeStamp=(timestamp + 30))
timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
self.assert_timestamps_equal(timestamp3, timestamp + 30)
self.dns_update_record(name, txt2).dwTimeStamp
timestamps = self.get_txt_timestamps(name, txt1, txt2, txt3)
self.assertEqual(timestamps, [timestamp,
timestamp,
timestamp + 30])
# with 3 records, one of which is static
# first we set the updatee's timestamp to a recognisable number
self.ldap_update_record(name, txt2, dwTimeStamp=999999)
for timestamp in (current_time,
six_days_ago,
eight_days_ago,
fifteen_days_ago,
hundred_days_ago,
thousand_days_ago,
100000,
10):
# wind back
self.ldap_update_record(name, txt1, dwTimeStamp=0)
self.ldap_update_record(name, txt3, dwTimeStamp=(timestamp - 9))
timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
self.assert_timestamps_equal(timestamp3, timestamp - 9)
self.dns_update_record(name, txt2)
timestamps = self.get_txt_timestamps(name, txt1, txt2, txt3)
self.assertEqual(timestamps, [0,
999999,
timestamp - 9])
# with 3 records, updating one which is static
timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp
for timestamp in (current_time,
six_days_ago,
eight_days_ago,
fifteen_days_ago,
hundred_days_ago,
thousand_days_ago,
100000,
10):
# wind back
self.ldap_update_record(name, txt1, dwTimeStamp=0)
self.ldap_update_record(name, txt2, dwTimeStamp=0)
self.ldap_update_record(name, txt3, dwTimeStamp=(timestamp + 30))
timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
self.assert_timestamps_equal(timestamp3, timestamp + 30)
self.dns_update_record(name, txt2).dwTimeStamp
timestamps = self.get_txt_timestamps(name, txt1, txt2, txt3)
self.assertEqual(timestamps, [0,
0,
timestamp + 30])
# with 3 records, after the static nodes have been replaced
self.ldap_update_record(name, txt1, dwTimeStamp=777777)
self.ldap_update_record(name, txt2, dwTimeStamp=888888)
timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp
for timestamp in (current_time,
six_days_ago,
eight_days_ago,
fifteen_days_ago,
hundred_days_ago,
thousand_days_ago,
100000,
10):
# wind back
self.ldap_update_record(name, txt3, dwTimeStamp=(timestamp))
timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
self.assert_timestamps_equal(timestamp3, timestamp)
self.dns_update_record(name, txt2)
timestamps = self.get_txt_timestamps(name, txt1, txt2, txt3)
self.assertEqual(timestamps, [777777,
888888,
timestamp])
def broken_test_update_aging_disabled_rpc(self):
# This one doesn't work reliably on Windows because there is a
# race between RPC and ldap.
name = 'test'
txt1 = ['test txt']
txt2 = ['test', 'txt2']
txt3 = ['test', 'txt3']
self.set_aging(False)
current_time = self.dns_update_record(name, txt1).dwTimeStamp
six_days_ago = current_time - 6 * 24
eight_days_ago = current_time - 8 * 24
fifteen_days_ago = current_time - 15 * 24
hundred_days_ago = current_time - 100 * 24
thousand_days_ago = current_time - 1000 * 24
# with 3 records, rpc updates
timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp
for timestamp in (current_time,
six_days_ago,
eight_days_ago,
fifteen_days_ago,
hundred_days_ago,
thousand_days_ago,
100000,
10):
# wind back
self.ldap_update_record(name, txt1, dwTimeStamp=777777)
self.ldap_update_record(name, txt2, dwTimeStamp=888888)
self.ldap_update_record(name, txt3, dwTimeStamp=(timestamp))
timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
self.assert_timestamps_equal(timestamp3, timestamp)
self.rpc_update_record(name, txt2)
time.sleep(2)
timestamps = self.get_txt_timestamps(name, txt1, txt2, txt3)
self.assertEqual(timestamps, [777777,
0,
timestamp])
def _test_update_aging_disabled_n_days_ago(self, n_days):
name = 'test'
txt1 = ['1']
txt2 = ['2']
self.set_aging(False)
current_time = self.dns_update_record(name, txt1).dwTimeStamp
# rewind timestamp using ldap
self.ldap_modify_timestamps(name, n_days * -24)
n_days_ago = self.get_unique_txt_record(name, txt1).dwTimeStamp
self.assertGreater(current_time, n_days_ago)
# no change when updating this record
update_timestamp = self.dns_update_record(name, txt1).dwTimeStamp
self.assert_timestamps_equal(update_timestamp, n_days_ago)
# add another record, which should have the current timestamp
timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
self.assert_soon_after(timestamp2, current_time)
# get the original record timestamp. NOW it matches current_time
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
self.assert_timestamps_equal(timestamp1, timestamp2)
# let's repeat that, this time with txt2 existing
self.ldap_update_record(name, txt1, dwTimeStamp=n_days_ago)
timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
self.assert_timestamps_equal(timestamp1, n_days_ago)
# this update is not an add
timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
self.assert_soon_after(timestamp2, current_time)
# now timestamp1 is not changed
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
self.assert_timestamps_equal(timestamp1, n_days_ago)
# delete record2, try again
self.ldap_delete_record(name, txt2)
self.ldap_update_record(name, txt1, dwTimeStamp=n_days_ago)
timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
self.assert_timestamps_equal(timestamp1, n_days_ago)
# here we are re-adding the deleted record
timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
self.assert_soon_after(timestamp2, current_time)
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
# It gets weird HERE.
# note how the SIBLING of the deleted, re-added record differs
# from the sibling of freshly added record, depending on the
# time difference.
if n_days <= 7:
self.assert_timestamps_equal(timestamp1, n_days_ago)
else:
self.assert_timestamps_equal(timestamp1, timestamp2)
# re-timestamp record2, try again
self.ldap_update_record(name, txt2, dwTimeStamp=n_days_ago)
self.ldap_update_record(name, txt1, dwTimeStamp=n_days_ago)
timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
self.assert_timestamps_equal(timestamp1, n_days_ago)
# no change
timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
self.assert_timestamps_equal(timestamp2, n_days_ago)
# also no change
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
self.assert_timestamps_equal(timestamp1, timestamp2)
# let's introduce another record
txt3 = ['3']
self.ldap_update_record(name, txt2, dwTimeStamp=n_days_ago)
self.ldap_update_record(name, txt1, dwTimeStamp=n_days_ago)
timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp
self.assert_soon_after(timestamp3, current_time)
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
if n_days <= 7:
self.assert_timestamps_equal(timestamp1, n_days_ago)
else:
self.assert_timestamps_equal(timestamp1, timestamp3)
self.assert_timestamps_equal(timestamp2, timestamp3)
self.ldap_delete_record(name, txt3)
timestamp2 = self.dns_update_record(name, txt3).dwTimeStamp
self.assert_soon_after(timestamp3, current_time)
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
if n_days <= 7:
self.assert_timestamps_equal(timestamp1, n_days_ago)
else:
self.assert_timestamps_equal(timestamp1, timestamp3)
self.assert_timestamps_equal(timestamp2, timestamp3)
# and here we'll make txt3 static
txt4 = ['4']
# and here we'll make txt1 static
self.ldap_update_record(name, txt1, dwTimeStamp=0)
self.ldap_update_record(name, txt2, dwTimeStamp=n_days_ago)
self.ldap_update_record(name, txt3, dwTimeStamp=n_days_ago)
timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
timestamp4 = self.dns_update_record(name, txt4).dwTimeStamp
self.assertEqual(timestamp1, 0)
self.assert_timestamps_equal(timestamp2, n_days_ago)
self.assert_timestamps_equal(timestamp3, n_days_ago)
self.assert_soon_after(timestamp4, current_time)
def test_update_aging_disabled_in_no_refresh_window(self):
self._test_update_aging_disabled_n_days_ago(4)
def test_update_aging_disabled_on_no_refresh_boundary(self):
self._test_update_aging_disabled_n_days_ago(7)
def test_update_aging_disabled_in_refresh_window(self):
self._test_update_aging_disabled_n_days_ago(9)
def test_update_aging_disabled_beyond_refresh_window(self):
self._test_update_aging_disabled_n_days_ago(16)
def test_update_aging_disabled_in_eighteenth_century(self):
self._test_update_aging_disabled_n_days_ago(100000)
def test_update_aging_disabled_static(self):
name = 'test'
txt1 = ['1']
txt2 = ['2']
self.set_aging(False)
current_time = self.dns_update_record(name, txt1).dwTimeStamp
self.ldap_update_record(name, txt1, dwTimeStamp=0)
# no change when updating this record
timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
self.assertEqual(timestamp1, 0)
# add another record, which should have the current timestamp
timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
self.assert_soon_after(timestamp2, current_time)
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
self.assert_soon_after(timestamp1, current_time)
# let's repeat that, this time with txt2 existing
timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
self.assert_soon_after(timestamp2, current_time)
timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
self.assert_soon_after(timestamp2, current_time)
# delete record2, try again
self.ldap_delete_record(name, txt2)
self.ldap_update_record(name, txt1, dwTimeStamp=0)
# no change when updating this record
timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
self.assertEqual(timestamp1, 0)
timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
self.assertEqual(timestamp2, 0)
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
self.assertEqual(timestamp1, 0)
# re-timestamp record2, try again
self.ldap_update_record(name, txt2, dwTimeStamp=1)
self.ldap_update_record(name, txt1, dwTimeStamp=0)
# no change when updating this record
timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
self.assert_timestamps_equal(timestamp2, 1)
def test_update_aging_disabled(self):
# With aging disabled, Windows updates the timestamps of all
# records when one is updated.
name = 'test'
txt1 = ['test txt']
txt2 = ['test', 'txt2']
txt3 = ['test', 'txt3']
minus_6 = -6 * 24
minus_8 = -8 * 24
self.set_aging(False)
current_time = self.dns_update_record(name, txt1).dwTimeStamp
# rewind timestamp using ldap
self.ldap_modify_timestamps(name, minus_6)
after_mod = self.get_unique_txt_record(name, txt1)
six_days_ago = after_mod.dwTimeStamp
self.assert_timestamps_equal(six_days_ago, current_time + minus_6)
# no change
update_timestamp = self.dns_update_record(name, txt1).dwTimeStamp
self.assert_timestamps_equal(update_timestamp, six_days_ago)
self.check_query_txt(name, txt1, zone=self.zone)
# another record
timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
self.assert_soon_after(timestamp2, current_time)
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
# without aging, timestamp1 is changed!!
self.assert_timestamps_equal(timestamp1, timestamp2)
# Set both records back to 8 days ago.
self.ldap_modify_timestamps(name, minus_8)
eight_days_ago = self.get_unique_txt_record(name, txt1).dwTimeStamp
self.assert_timestamps_equal(eight_days_ago, current_time + minus_8)
update2 = self.dns_update_record(name, txt2)
# Without aging on, an update should not change the timestamps.
self.assert_timestamps_equal(update2.dwTimeStamp, eight_days_ago)
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
self.assert_timestamps_equal(timestamp1, eight_days_ago)
# Add another txt record. The new record should have the now
# timestamp, and drag the others up with it.
timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp
self.assert_soon_after(timestamp3, current_time)
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
self.assert_timestamps_equal(timestamp1, timestamp3)
self.assert_timestamps_equal(timestamp2, timestamp3)
hundred_days_ago = current_time - 100 * 24
thousand_days_ago = current_time - 1000 * 24
record = self.ldap_update_record(name, txt1,
dwTimeStamp=hundred_days_ago)
self.assert_timestamps_equal(record.dwTimeStamp, hundred_days_ago)
record = self.ldap_update_record(name, txt2,
dwTimeStamp=thousand_days_ago)
self.assert_timestamps_equal(record.dwTimeStamp, thousand_days_ago)
# update 3, will others change (because beyond RefreshInterval)? yes.
timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
self.assert_soon_after(timestamp3, current_time)
self.assert_timestamps_equal(timestamp1, hundred_days_ago)
self.assert_timestamps_equal(timestamp2, thousand_days_ago)
fifteen_days_ago = current_time - 15 * 24
self.ldap_update_record(name, txt3, dwTimeStamp=fifteen_days_ago)
timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
# DNS update has no effect because all records are old
self.assert_timestamps_equal(timestamp2, thousand_days_ago)
self.assert_timestamps_equal(timestamp1, hundred_days_ago)
self.assert_timestamps_equal(timestamp3, fifteen_days_ago)
# Does update of old record affect timestamp of refreshable record? No.
self.ldap_update_record(name, txt3, dwTimeStamp=eight_days_ago)
timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
# DNS update has no effect because all records are old
self.assert_timestamps_equal(timestamp2, thousand_days_ago)
self.assert_timestamps_equal(timestamp1, hundred_days_ago)
self.assert_timestamps_equal(timestamp3, eight_days_ago)
# RPC zeros timestamp, after which updates won't change it.
# BUT it refreshes all others!
self.rpc_update_record(name, txt2)
timestamp2 = self.dns_update_record(name, txt3).dwTimeStamp
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
self.assertEqual(timestamp2, 0)
self.assert_soon_after(timestamp1, current_time)
self.assert_timestamps_equal(timestamp3, eight_days_ago)
def test_update_aging_enabled(self):
name = 'test'
txt1 = ['test txt']
txt2 = ['test', 'txt2']
txt3 = ['test', 'txt3']
txt4 = ['4']
self.set_aging(True)
current_time = self.dns_update_record(name, txt2).dwTimeStamp
six_days_ago = current_time - 6 * 24
eight_days_ago = current_time - 8 * 24
fifteen_days_ago = current_time - 15 * 24
hundred_days_ago = current_time - 100 * 24
self.ldap_update_record(name, txt1, dwTimeStamp=six_days_ago)
# with or without aging, a delta of -6 days does not affect
# timestamps, because dwNoRefreshInterval is 7 days.
timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
self.assert_timestamps_equal(timestamp1, six_days_ago)
self.assert_soon_after(timestamp2, current_time)
self.ldap_update_record(name, txt3, dwTimeStamp=eight_days_ago)
timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
self.assert_timestamps_equal(timestamp3, eight_days_ago)
# update 1, what happens to 2 and 3? Nothing?
timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
self.assert_timestamps_equal(timestamp1, six_days_ago)
self.assert_soon_after(timestamp2, current_time)
self.assert_timestamps_equal(timestamp3, eight_days_ago)
# now set 1 to 8 days, and we should see changes
self.ldap_update_record(name, txt1, dwTimeStamp=eight_days_ago)
# update 1, what happens to 2 and 3? Nothing?
timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
self.assert_soon_after(timestamp1, current_time)
self.assert_soon_after(timestamp2, current_time)
self.assert_timestamps_equal(timestamp3, eight_days_ago)
# next few ones use these numbers
self.ldap_update_record(name, txt1, dwTimeStamp=fifteen_days_ago)
self.ldap_update_record(name, txt2, dwTimeStamp=six_days_ago)
self.ldap_update_record(name, txt3, dwTimeStamp=eight_days_ago)
# change even though 1 is outside the window
timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
self.assert_soon_after(timestamp1, current_time)
self.assert_timestamps_equal(timestamp2, six_days_ago)
self.assert_timestamps_equal(timestamp3, eight_days_ago)
# reset 1
self.ldap_update_record(name, txt1, dwTimeStamp=fifteen_days_ago)
# no change, because 2 is outside the window
timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
self.assert_timestamps_equal(timestamp1, fifteen_days_ago)
self.assert_timestamps_equal(timestamp2, six_days_ago)
self.assert_timestamps_equal(timestamp3, eight_days_ago)
# 3 changes, others do not
timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
self.assert_timestamps_equal(timestamp1, fifteen_days_ago)
self.assert_timestamps_equal(timestamp2, six_days_ago)
self.assert_soon_after(timestamp3, current_time)
# reset 3 to 100 days
self.ldap_update_record(name, txt3, dwTimeStamp=hundred_days_ago)
# 3 changes, others do not
timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
self.assert_timestamps_equal(timestamp1, fifteen_days_ago)
self.assert_timestamps_equal(timestamp2, six_days_ago)
self.assert_soon_after(timestamp3, current_time)
# reset 1 and 3 to 8 days. does update of 1 affect 3?
self.ldap_update_record(name, txt1, dwTimeStamp=eight_days_ago)
self.ldap_update_record(name, txt3, dwTimeStamp=eight_days_ago)
# 1 changes, others do not
timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
self.assert_soon_after(timestamp1, current_time)
self.assert_timestamps_equal(timestamp2, six_days_ago)
self.assert_timestamps_equal(timestamp3, eight_days_ago)
# Try an RPC update, zeroing 1 --> what happens to 3?
timestamp1 = self.rpc_update_record(name, txt1).dwTimeStamp
timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
self.assertEqual(timestamp1, 0)
self.assert_timestamps_equal(timestamp2, six_days_ago)
self.assert_timestamps_equal(timestamp3, eight_days_ago)
# with 2 and 3 at 8 days, does static record change things?
self.ldap_update_record(name, txt2, dwTimeStamp=eight_days_ago)
# 2 changes, but to zero!
timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
self.assert_timestamps_equal(timestamp1, 0)
self.assert_timestamps_equal(timestamp2, 0)
self.assert_timestamps_equal(timestamp3, eight_days_ago)
self.ldap_update_record(name, txt2, dwTimeStamp=six_days_ago)
self.ldap_update_record(name, txt1, dwTimeStamp=3000000)
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
self.assert_timestamps_equal(timestamp1, 3000000)
# dns update remembers that node is static, even with no
# static records.
timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
self.assertEqual(timestamp1, 0)
# Add another txt record. The new record should have the now
# timestamp, and the others should remain unchanged.
# BUT somehow record 1 is static!?
timestamp4 = self.dns_update_record(name, txt4).dwTimeStamp
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
self.assert_timestamps_equal(timestamp1, 0)
self.assert_timestamps_equal(timestamp2, six_days_ago)
self.assert_timestamps_equal(timestamp3, eight_days_ago)
self.assert_timestamps_equal(timestamp4, 0)
def _test_update_aging_enabled_n_days_ago(self, n_days):
name = 'test'
txt1 = ['1']
txt2 = ['2']
delta = n_days * -24
self.set_aging(True)
current_time = self.dns_update_record(name, txt1).dwTimeStamp
# rewind timestamp using ldap
self.ldap_modify_timestamps(name, delta)
n_days_ago = self.get_unique_txt_record(name, txt1).dwTimeStamp
self.assertGreater(current_time, n_days_ago)
# update changes timestamp depending on time.
timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
if n_days <= 7:
self.assert_timestamps_equal(timestamp1, n_days_ago)
else:
self.assert_soon_after(timestamp1, current_time)
# add another record, which should have the current timestamp
timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
self.assert_soon_after(timestamp2, current_time)
# first record should not have changed
timestamp1_b = self.get_unique_txt_record(name, txt1).dwTimeStamp
self.assert_timestamps_equal(timestamp1, timestamp1_b)
# let's repeat that, this time with txt2 existing
self.ldap_update_record(name, txt1, dwTimeStamp=n_days_ago)
timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
self.assert_timestamps_equal(timestamp1, timestamp1_b)
# this update is not an add. record 2 is already up-to-date
timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
self.assert_soon_after(timestamp2, current_time)
# now timestamp1 is not changed
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
self.assert_timestamps_equal(timestamp1, timestamp1_b)
# delete record2, try again
self.ldap_delete_record(name, txt2)
self.ldap_update_record(name, txt1, dwTimeStamp=n_days_ago)
timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
if n_days <= 7:
self.assert_timestamps_equal(timestamp1, n_days_ago)
else:
self.assert_soon_after(timestamp1, current_time)
# here we are re-adding the deleted record
timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
self.assert_soon_after(timestamp2, current_time)
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
# It gets weird HERE.
# note how the SIBLING of the deleted, re-added record differs
# from the sibling of freshly added record, depending on the
# time difference.
if n_days <= 7:
self.assert_timestamps_equal(timestamp1, n_days_ago)
else:
self.assert_timestamps_equal(timestamp1, timestamp2)
# re-timestamp record2, try again
self.ldap_update_record(name, txt2, dwTimeStamp=n_days_ago)
self.ldap_update_record(name, txt1, dwTimeStamp=n_days_ago)
# this should make no difference
timestamp1_b = self.dns_update_record(name, txt1).dwTimeStamp
self.assert_timestamps_equal(timestamp1, timestamp1_b)
# no change
timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
self.assert_timestamps_equal(timestamp2, timestamp1)
# also no change
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
self.assert_timestamps_equal(timestamp1, timestamp2)
# let's introduce another record
txt3 = ['3']
self.ldap_update_record(name, txt2, dwTimeStamp=n_days_ago)
self.ldap_update_record(name, txt1, dwTimeStamp=n_days_ago)
timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp
self.assert_soon_after(timestamp3, current_time)
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
self.assert_timestamps_equal(timestamp1, n_days_ago)
self.assert_timestamps_equal(timestamp2, n_days_ago)
self.ldap_delete_record(name, txt3)
timestamp2 = self.dns_update_record(name, txt3).dwTimeStamp
self.assert_soon_after(timestamp3, current_time)
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
self.assert_timestamps_equal(timestamp1, n_days_ago)
self.assert_timestamps_equal(timestamp2, n_days_ago)
txt4 = ['4']
# Because txt1 is static, txt4 is static
self.ldap_update_record(name, txt1, dwTimeStamp=0)
self.ldap_update_record(name, txt2, dwTimeStamp=n_days_ago)
self.ldap_update_record(name, txt3, dwTimeStamp=n_days_ago)
timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
timestamp4 = self.dns_update_record(name, txt4).dwTimeStamp
self.assert_timestamps_equal(timestamp1, 0)
self.assert_timestamps_equal(timestamp2, n_days_ago)
self.assert_timestamps_equal(timestamp3, n_days_ago)
self.assert_timestamps_equal(timestamp4, 0)
longer_ago = n_days_ago // 2
# remove all static records.
self.ldap_delete_record(name, txt4)
self.ldap_update_record(name, txt1, dwTimeStamp=longer_ago)
self.ldap_update_record(name, txt2, dwTimeStamp=n_days_ago)
self.ldap_update_record(name, txt3, dwTimeStamp=n_days_ago)
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
self.assert_timestamps_equal(timestamp1, longer_ago)
timestamp4 = self.dns_update_record(name, txt4).dwTimeStamp
timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
# Here, although there is no record frm which to get the zero
# timestamp, record 4 does it anyway.
self.assert_timestamps_equal(timestamp1, longer_ago)
self.assert_timestamps_equal(timestamp2, n_days_ago)
self.assert_timestamps_equal(timestamp3, n_days_ago)
self.assert_timestamps_equal(timestamp4, 0)
# and now record 1 wants to be static.
self.ldap_update_record(name, txt4, dwTimeStamp=longer_ago)
timestamp4 = self.get_unique_txt_record(name, txt4).dwTimeStamp
self.assert_timestamps_equal(timestamp4, longer_ago)
timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
timestamp4 = self.get_unique_txt_record(name, txt4).dwTimeStamp
self.assert_timestamps_equal(timestamp1, 0)
self.assert_timestamps_equal(timestamp4, longer_ago)
def test_update_aging_enabled_in_no_refresh_window(self):
self._test_update_aging_enabled_n_days_ago(4)
def test_update_aging_enabled_on_no_refresh_boundary(self):
self._test_update_aging_enabled_n_days_ago(7)
def test_update_aging_enabled_in_refresh_window(self):
self._test_update_aging_enabled_n_days_ago(9)
def test_update_aging_enabled_beyond_refresh_window(self):
self._test_update_aging_enabled_n_days_ago(16)
def test_update_aging_enabled_in_eighteenth_century(self):
self._test_update_aging_enabled_n_days_ago(100000)
def test_update_static_stickiness(self):
name = 'test'
A = ['A']
B = ['B']
C = ['C']
D = ['D']
self.set_aging(False)
self.dns_update_record(name, A).dwTimeStamp
self.ldap_update_record(name, B, dwTimeStamp=0)
self.dns_update_record(name, B)
self.dns_update_record(name, C)
ctime = self.get_unique_txt_record(name, C).dwTimeStamp
self.assertEqual(ctime, 0)
btime = self.get_unique_txt_record(name, B).dwTimeStamp
self.assertEqual(btime, 0)
self.ldap_replace_records(name, [])
self.dns_update_record(name, D)
dtime = self.get_unique_txt_record(name, D).dwTimeStamp
self.assertEqual(dtime, 0)
def _test_update_timestamp_weirdness(self, n_days, aging=True):
name = 'test'
A = ['A']
B = ['B']
self.set_aging(aging)
current_time = self.dns_update_record(name, A).dwTimeStamp
# rewind timestamp using ldap
self.ldap_modify_timestamps(name, n_days * -24)
n_days_ago = self.get_unique_txt_record(name, A).dwTimeStamp
time_A = self.dns_update_record(name, A).dwTimeStamp
# that dns_update should have reset the timestamp ONLY if
# aging is on and the old timestamp is > noRefresh period (7
# days)
if n_days > 7 and aging:
self.assert_soon_after(time_A, current_time)
else:
self.assert_timestamps_equal(time_A, n_days_ago)
# add another record, which should have the current timestamp
time_B = self.dns_update_record(name, B).dwTimeStamp
self.assert_soon_after(time_B, current_time)
time_A = self.get_unique_txt_record(name, A).dwTimeStamp
if aging and n_days <= 7:
self.assert_timestamps_equal(time_A, n_days_ago)
else:
self.assert_soon_after(time_A, current_time)
# delete B, try again
self.ldap_delete_record(name, B)
self.ldap_update_record(name, A, dwTimeStamp=n_days_ago)
time_A = self.dns_update_record(name, A).dwTimeStamp
# here we are re-adding the deleted record
time_B = self.dns_update_record(name, B).dwTimeStamp
self.assert_soon_after(time_B, current_time)
time_A = self.get_unique_txt_record(name, A).dwTimeStamp
return n_days_ago, time_A, time_B
def test_update_timestamp_weirdness_no_refresh_no_aging(self):
n_days_ago, time_A, time_B = \
self._test_update_timestamp_weirdness(5, False)
# the timestamp of the SIBLING of the deleted, re-added record
# differs from the sibling of freshly added record.
self.assert_timestamps_equal(time_A, n_days_ago)
def test_update_timestamp_weirdness_no_refresh_aging(self):
n_days_ago, time_A, time_B = \
self._test_update_timestamp_weirdness(5, True)
# the timestamp of the SIBLING of the deleted, re-added record
# differs from the sibling of freshly added record.
self.assert_timestamps_equal(time_A, n_days_ago)
def test_update_timestamp_weirdness_refresh_no_aging(self):
n_days_ago, time_A, time_B = \
self._test_update_timestamp_weirdness(9, False)
self.assert_timestamps_equal(time_A, time_B)
def test_update_timestamp_weirdness_refresh_aging(self):
n_days_ago, time_A, time_B = \
self._test_update_timestamp_weirdness(9, True)
self.assert_timestamps_equal(time_A, time_B)
def test_aging_refresh(self):
name, txt = 'agingtest', ['test txt']
no_refresh = 100
refresh = 80
self.set_zone_int_params(NoRefreshInterval=no_refresh,
RefreshInterval=refresh,
Aging=1)
before_mod = self.dns_update_record(name, txt)
start_time = before_mod.dwTimeStamp
# go back 86 hours, which is in the no-refresh time (but
# wouldn't be if we had stuck to the default of 84).
self.ldap_modify_timestamps(name, -86)
rec = self.dns_update_record(name, txt)
self.assert_timestamps_equal(rec.dwTimeStamp,
start_time - 86)
# back to -102 hours, into the refresh zone
# the update should reset the timestamp to now.
self.ldap_modify_timestamps(name, -16)
rec = self.dns_update_record(name, txt)
self.assert_soon_after(rec.dwTimeStamp, start_time)
# back to -182 hours, beyond the end of the refresh period.
# Actually nothing changes at this time -- we can still
# refresh, but the record is liable for scavenging.
self.ldap_modify_timestamps(name, -182)
rec = self.dns_update_record(name, txt)
self.assert_soon_after(rec.dwTimeStamp, start_time)
def test_add_no_timestamp(self):
# check zero timestamp is implicit
self.set_aging(True)
rec = self.ldap_update_record('ldap', 'test')
self.assertEqual(rec.dwTimeStamp, 0)
rec = self.rpc_update_record('rpc', 'test')
self.assertEqual(rec.dwTimeStamp, 0)
def test_add_zero_timestamp(self):
rec = self.ldap_update_record('ldap', 'test', dwTimeStamp=0)
self.assertEqual(rec.dwTimeStamp, 0)
rec = self.rpc_update_record('rpc', 'test', dwTimeStamp=0)
self.assertEqual(rec.dwTimeStamp, 0)
def test_add_update_timestamp(self):
# LDAP can change timestamp, RPC can't
rec = self.ldap_update_record('ldap', 'test', dwTimeStamp=123456)
self.assertEqual(rec.dwTimeStamp, 123456)
rec = self.rpc_update_record('rpc', 'test', dwTimeStamp=123456)
self.assertEqual(rec.dwTimeStamp, 0)
# second time is a different code path (add vs update)
rec = self.rpc_update_record('rpc', 'test', dwTimeStamp=123456)
self.assertEqual(rec.dwTimeStamp, 0)
# RPC update the one with timestamp, zeroing it.
rec = self.rpc_update_record('ldap', 'test', dwTimeStamp=123456)
self.assertEqual(rec.dwTimeStamp, 0)
def test_add_update_ttl(self):
# RPC *can* set dwTtlSeconds.
rec = self.ldap_update_record('ldap', 'test',
dwTtlSeconds=1234)
self.assertEqual(rec.dwTtlSeconds, 1234)
rec = self.rpc_update_record('rpc', 'test', dwTtlSeconds=1234)
self.assertEqual(rec.dwTtlSeconds, 1234)
# does update work like add?
rec = self.rpc_update_record('rpc', 'test', dwTtlSeconds=4321)
self.assertEqual(rec.dwTtlSeconds, 4321)
rec = self.rpc_update_record('ldap', 'test', dwTtlSeconds=5678)
self.assertEqual(rec.dwTtlSeconds, 5678)
def test_add_update_ttl_serial(self):
# when setting dwTtlSeconds, what happens to serial number?
rec = self.ldap_update_record('ldap', 'test',
dwTtlSeconds=1234,
dwSerial=123)
self.assertEqual(rec.dwTtlSeconds, 1234)
self.assertEqual(rec.dwSerial, 123)
rec = self.rpc_update_record('rpc', 'test', dwTtlSeconds=1234)
self.assertEqual(rec.dwTtlSeconds, 1234)
serial = rec.dwSerial
self.assertLess(serial, 4)
rec = self.rpc_update_record('rpc', 'test', dwTtlSeconds=4321)
self.assertEqual(rec.dwTtlSeconds, 4321)
self.assertEqual(rec.dwSerial, serial + 1)
rec = self.rpc_update_record('ldap', 'test', dwTtlSeconds=5678)
self.assertEqual(rec.dwTtlSeconds, 5678)
self.assertEqual(rec.dwSerial, 124)
def test_add_update_dwFlags(self):
# dwFlags splits into rank and flags.
# according to [MS-DNSP] 2.3.2.2, flags MUST be zero
rec = self.ldap_update_record('ldap', 'test', flags=22222, rank=222)
self.assertEqual(rec.flags, 22222)
self.assertEqual(rec.rank, 222)
rec = self.rpc_update_record('ldap', 'test', dwFlags=3333333)
# rank != 3333333 & 0xff == 213
self.assertEqual(rec.rank, 240) # RPC fixes rank
self.assertEqual(rec.flags, 0)
self.assertRaises(OverflowError,
self.ldap_update_record,
'ldap', 'test', flags=777777777, rank=777)
# reset to no default (rank overflows)
rec = self.ldap_update_record('ldap', 'test', flags=7777, rank=777)
self.assertEqual(rec.flags, 7777)
self.assertEqual(rec.rank, 9)
# DNS update zeros flags, sets rank to 240 (RANK_ZONE)
rec = self.dns_update_record('ldap', 'test', ttl=999)
self.assertEqual(rec.flags, 0)
self.assertEqual(rec.rank, 240)
rec = self.rpc_update_record('ldap', 'test', dwFlags=321)
self.assertEqual(rec.flags, 0)
self.assertEqual(rec.rank, 240)
# RPC adding a new record: fixed rank, zero flags
rec = self.rpc_update_record('ldap', 'test 2', dwFlags=12345)
self.assertEqual(rec.rank, 240)
self.assertEqual(rec.flags, 0)
def test_add_update_dwReserved(self):
# RPC does not change dwReserved.
rec = self.ldap_update_record('ldap', 'test', dwReserved=54321)
self.assertEqual(rec.dwReserved, 54321)
rec = self.rpc_update_record('rpc', 'test', dwReserved=54321)
self.assertEqual(rec.dwReserved, 0)
rec = self.rpc_update_record('rpc', 'test', dwReserved=54321)
self.assertEqual(rec.dwReserved, 0)
rec = self.rpc_update_record('ldap', 'test', dwReserved=12345)
self.assertEqual(rec.dwReserved, 54321)
def test_add_update_dwSerial(self):
# On Windows the RPC record ends up with serial 2, on Samba
# serial 3. Rather than knownfail this, we accept anything
# below 4 (for now).
rec = self.ldap_update_record('ldap', 'test', dwSerial=123)
self.assertEqual(rec.dwSerial, 123)
rec = self.rpc_update_record('rpc', 'test', dwSerial=123)
self.assertLess(rec.dwSerial, 4)
rec = self.rpc_update_record('rpc', 'test', dwSerial=123)
self.assertLess(rec.dwSerial, 4)
rec = self.dns_update_record('rpc', 'test')
self.assertLess(rec.dwSerial, 4)
rec = self.dns_update_record('dns-0', 'test')
self.assertLess(rec.dwSerial, 5)
rec = self.dns_update_record('ldap', 'test')
self.assertEqual(rec.dwSerial, 123)
rec = self.rpc_update_record('ldap', 'test', dwSerial=123)
self.assertEqual(rec.dwSerial, 123)
rec = self.ldap_update_record('ldap', 'test', dwSerial=12)
self.assertEqual(rec.dwSerial, 12)
# when we dns-updated ldap/test, we alerted Windows to 123 as
# a high water mark for the zone. (even though we have since
# dropped the serial to 12, 123 is the base serial for new
# records).
rec = self.dns_update_record('dns', 'test')
self.assertEqual(rec.dwSerial, 124)
rec = self.dns_update_record('dns2', 'test')
self.assertEqual(rec.dwSerial, 125)
rec = self.rpc_update_record('rpc2', 'test')
self.assertEqual(rec.dwSerial, 126)
rec = self.dns_update_record('dns', 'test 2')
self.assertEqual(rec.dwSerial, 127)
def test_add_update_dwSerial_2(self):
# On Samba the RPC update resets the serial to a low number,
# while Windows leaves it high.
rec = self.ldap_update_record('ldap', 'test', dwSerial=123)
self.assertEqual(rec.dwSerial, 123)
rec = self.rpc_update_record('ldap', 'test', dwSerial=321)
self.assertEqual(rec.dwSerial, 123)
rec = self.dns_update_record('ldap', 'test')
self.assertEqual(rec.dwSerial, 123)
def test_add_update_many(self):
# Samba fails often in this set, but we want to see how it
# goes further down, so we print the problems and defer the
# failure.
failures = 0
total = 0
def _defer_wrap(f):
def _defer(*args):
nonlocal failures, total
total += 1
try:
f(*args)
except self.failureException as e:
from traceback import format_stack
print(f"{format_stack()[-2]} {e}\n")
failures += 1
return _defer
defer_assertEqual = _defer_wrap(self.assertEqual)
defer_assert_timestamp_in_ballpark = \
_defer_wrap(self.assert_timestamp_in_ballpark)
self.set_aging(False)
rec = self.ldap_update_record('ldap', 'test',
version=11,
rank=22,
flags=33,
dwSerial=44,
dwTtlSeconds=55,
dwReserved=66,
dwTimeStamp=77)
self.assertEqual(rec.version, 5) # disobeys request
self.assertEqual(rec.rank, 22)
self.assertEqual(rec.flags, 33)
self.assertEqual(rec.dwSerial, 44)
self.assertEqual(rec.dwTtlSeconds, 55)
self.assertEqual(rec.dwReserved, 66)
self.assertEqual(rec.dwTimeStamp, 77)
# DNS updates first
rec = self.dns_update_record('ldap', 'test', ttl=999)
self.assertEqual(rec.version, 5)
self.assertEqual(rec.rank, 240) # rank gets fixed by DNS update
defer_assertEqual(rec.flags, 0) # flags gets fixed
defer_assertEqual(rec.dwSerial, 45) # serial increments
self.assertEqual(rec.dwTtlSeconds, 999) # TTL set
defer_assertEqual(rec.dwReserved, 0) # reserved fixed
defer_assert_timestamp_in_ballpark(rec) # changed on Windows ?!
self.set_aging(True)
rec = self.dns_update_record('ldap', 'test', ttl=1111)
self.assertEqual(rec.version, 5)
self.assertEqual(rec.rank, 240)
defer_assertEqual(rec.flags, 0)
defer_assertEqual(rec.dwSerial, 46)
self.assertEqual(rec.dwTtlSeconds, 1111) # TTL set
defer_assertEqual(rec.dwReserved, 0)
self.assert_timestamp_in_ballpark(rec)
# RPC update
rec = self.rpc_update_record('ldap', 'test',
version=111,
dwFlags=333,
dwSerial=444,
dwTtlSeconds=555,
dwReserved=666,
dwTimeStamp=777)
self.assertEqual(rec.version, 5) # no change
self.assertEqual(rec.rank, 240) # no change
defer_assertEqual(rec.flags, 0) # no change
defer_assertEqual(rec.dwSerial, 47) # Serial increments
self.assertEqual(rec.dwTtlSeconds, 555) # TTL set
defer_assertEqual(rec.dwReserved, 0) # no change
self.assertEqual(rec.dwTimeStamp, 0) # timestamp zeroed
# RPC update, using default values
rec = self.rpc_update_record('ldap', 'test')
self.assertEqual(rec.version, 5)
self.assertEqual(rec.rank, 240)
defer_assertEqual(rec.flags, 0)
defer_assertEqual(rec.dwSerial, 48) # serial increments
self.assertEqual(rec.dwTtlSeconds, 900) # TTL changed
defer_assertEqual(rec.dwReserved, 0)
self.assertEqual(rec.dwTimeStamp, 0)
self.set_aging(False)
rec = self.dns_update_record('ldap', 'test', ttl=888)
self.assertEqual(rec.version, 5)
self.assertEqual(rec.rank, 240)
defer_assertEqual(rec.flags, 0)
defer_assertEqual(rec.dwSerial, 49) # serial increments
self.assertEqual(rec.dwTtlSeconds, 888) # TTL set
defer_assertEqual(rec.dwReserved, 0)
self.assertEqual(rec.dwTimeStamp, 0) # timestamp stays zero
if failures:
self.fail(f"failed {failures}/{total} defered assertions")
def test_static_record_dynamic_update(self):
"""Add a static record, then a dynamic record.
The dynamic record should have a timestamp set."""
name = 'test'
txt = ['static txt']
txt2 = ['dynamic txt']
self.set_aging(True)
rec = self.ldap_update_record(name, txt, dwTimeStamp=0)
rec2 = self.dns_update_record(name, txt2)
self.assert_timestamp_in_ballpark(rec2)
ts2 = rec2.dwTimeStamp
# update the first record. It should stay static (timestamp 0)
rec = self.dns_update_record(name, txt)
self.assertEqual(rec.dwTimeStamp, 0)
# and rec2 should be unchanged.
self.assertEqual(rec2.dwTimeStamp, ts2)
def test_dynamic_record_static_update(self):
name = 'agingtest'
txt1 = ['dns update before']
txt2 = ['ldap update']
txt3 = ['dns update after']
self.set_aging(True)
self.dns_update_record(name, txt1)
self.ldap_update_record(name, txt2)
self.dns_update_record(name, txt3)
recs = self.get_rpc_records(name)
for r in recs:
d = [x.str for x in r.data.str]
if d == txt1:
self.assertNotEqual(r.dwTimeStamp, 0)
elif d == txt2:
self.assertEqual(r.dwTimeStamp, 0)
elif d == txt3:
self.assertNotEqual(r.dwTimeStamp, 0)
def test_basic_scavenging(self):
# NOTE: This one fails on Windows, because the RPC call to
# prompt scavenging is not immediate. On Samba, in the
# testenv, we don't have the RPC call but we can connect to
# the database directly.
# just to be sure we have the right limits.
self.set_zone_int_params(NoRefreshInterval=84,
RefreshInterval=84,
Aging=1)
ts1, ts2, ts3, ts4, ts5, ts6 = ('1', '2', '3', '4', '5', '6')
self.dns_update_record(ts1, ts1)
self.dns_update_record(ts2, ts2)
# ts2 is tombstoned and timestamped in 1981
self.dns_tombstone(ts2)
# ts3 is tombstoned and timestamped in the future
self.dns_tombstone(ts3, epoch_hours=(DNS_TIMESTAMP_2101 - 1))
# ts4 is tombstoned and timestamped in the past
self.dns_tombstone(ts4, epoch_hours=1111111)
# ts5 is tombstoned in the past and timestamped in the future
self.dns_tombstone(ts5, epoch_hours=5555555, epoch_nttime=int(1e10))
# ts2 and ts3 should now be tombstoned.
self.assert_tombstoned(ts2)
self.assert_tombstoned(ts3)
# let's un-tombstone ts2
# ending up with dnsTombstoned: FALSE in Samba
# and no dNSTombstoned in Windows.
self.dns_update_record(ts2, "ts2 untombstoned")
ts2_node = self.get_one_node(ts2)
ts2_tombstone = ts2_node.get("dNSTombstoned")
if ts2_tombstone is not None:
self.assertEqual(ts2_tombstone[0], b"FALSE")
self.assert_tombstoned(ts2, tombstoned=False)
r = self.dns_update_record(ts6, ts6)
# put some records into the death zone.
self.ldap_modify_timestamps(ts1, -15 * 24)
self.ldap_modify_timestamps(ts2, -14 * 24 - 2)
self.ldap_modify_timestamps(ts6, -14 * 24 + 2)
# ts1 will be saved by this record
self.dns_update_record(ts1, "another record")
try:
# Tell the server to clean-up records.
# This is how it *should* work on Windows:
self.rpc_conn.DnssrvOperation2(
dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0,
SERVER_IP,
None,
0,
"StartScavenging",
dnsserver.DNSSRV_TYPEID_NULL,
None)
# Samba won't get here (NOT_IMPLEMENTED error)
# wait for Windows to do its cleanup.
time.sleep(2)
except WERRORError as e:
if e.args[0] == werror.WERR_CALL_NOT_IMPLEMENTED:
# This is the Samba way, talking to the file directly,
# as if we were the server process. The direct
# connection is needed because the tombstoning search
# involves a magic system only filter.
file_samdb = get_file_samdb()
dsdb._scavenge_dns_records(file_samdb)
dsdb._dns_delete_tombstones(file_samdb)
else:
raise
# Now what we should have:
# ts1: alive: the old record is deleted, the new one not.
# ts2: tombstoned
# ts3: tombstoned
# ts4: deleted. gone.
# ts5: deleted. timestamp affects tombstoning, but not deletion.
# ts6: alive
#
# We order our assertions to make the windows test
# fail as late as possible (on ts4, ts5, ts2).
r = self.get_unique_txt_record(ts1, ["another record"])
self.assertIsNotNone(r)
r = self.get_unique_txt_record(ts6, [ts6])
self.assertIsNotNone(r)
self.assert_tombstoned(ts3)
n = self.get_one_node(ts4)
self.assertIsNone(n)
n = self.get_one_node(ts5)
self.assertIsNone(n)
self.assert_tombstoned(ts2)
def _test_A_and_AAAA_records(self, A, B, a_days, b_days, aging):
self.set_aging(aging)
name = 'aargh'
now = dsdb_dns.unix_to_dns_timestamp(int(time.time()))
a_initial = now - 24 * a_days
b_initial = now - 24 * b_days
self.dns_update_non_text(name, A)
self.ldap_modify_timestamps(name, a_days * -24)
rec_a = self.get_unique_ip_record(name, A)
rec_b = self.add_ip_record(name, B, dwTimeStamp=b_initial)
self.assert_timestamps_equal(rec_a, a_initial)
self.assert_timestamps_equal(rec_b, b_initial)
# touch the A record.
self.dns_update_non_text(name, A)
# check the A timestamp, depending on norefresh
rec_a = self.get_unique_ip_record(name, A)
if aging and a_days > 7:
time_a = now
self.assert_soon_after(rec_a, now)
elif a_days > 7:
# when we have NO aging and are in the refresh window, the
# timestamp now reads as a_initial, but will become now
# after we manipulate B for a bit.
time_a = now
self.assert_timestamps_equal(rec_a, a_initial)
else:
time_a = a_initial
self.assert_timestamps_equal(rec_a, a_initial)
# B timestamp should be unchanged?
rec_b = self.get_unique_ip_record(name, B)
self.assert_timestamps_equal(rec_b, b_initial)
# touch the B record.
self.dns_update_non_text(name, B)
# check the B timestamp
rec_b = self.get_unique_ip_record(name, B)
self.assert_soon_after(rec_b, now)
# rewind B
rec_b = self.add_ip_record(name, B, dwTimeStamp=b_initial)
# NOW rec A might have changed! with no aging, and out of refresh.
rec_a = self.get_unique_ip_record(name, A)
self.assert_timestamps_equal(rec_a, time_a)
self.dns_update_non_text(name, A)
rec_a = self.get_unique_ip_record(name, B)
self.assert_timestamps_equal(rec_b, b_initial)
# now delete A
_, wtype = guess_wtype(A)
self.ldap_delete_record(name, A, wtype=wtype)
# re-add it
self.dns_update_non_text(name, A)
rec_a = self.get_unique_ip_record(name, A)
self.assert_soon_after(rec_a, now)
rec_b = self.get_unique_ip_record(name, B)
self.assert_timestamps_equal(rec_b, b_initial)
def test_A_5_days_AAAA_5_days_aging(self):
self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 5, 5, aging=True)
def test_A_5_days_AAAA_5_days_no_aging(self):
self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 5, 5, aging=False)
def test_A_5_days_AAAA_10_days_aging(self):
self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 5, 10, aging=True)
def test_A_5_days_AAAA_10_days_no_aging(self):
self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 5, 10, aging=False)
def test_A_10_days_AAAA_5_days_aging(self):
self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 10, 5, aging=True)
def test_A_10_days_AAAA_5_days_no_aging(self):
self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 10, 5, aging=False)
def test_A_10_days_AAAA_9_days_aging(self):
self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 10, 9, aging=True)
def test_A_9_days_AAAA_10_days_no_aging(self):
self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 9, 10, aging=False)
def test_A_20_days_AAAA_2_days_aging(self):
self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 20, 2, aging=True)
def test_A_6_days_AAAA_40_days_no_aging(self):
self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 6, 40, aging=False)
def test_A_5_days_A_5_days_aging(self):
self._test_A_and_AAAA_records(IPv4_ADDR, IPv4_ADDR_2, 5, 5, aging=True)
def test_A_5_days_A_10_days_no_aging(self):
self._test_A_and_AAAA_records(IPv4_ADDR, IPv4_ADDR_2, 5, 10, aging=False)
def test_AAAA_5_days_AAAA_6_days_aging(self):
self._test_A_and_AAAA_records(IPv6_ADDR, IPv6_ADDR_2, 5, 6, aging=True)
def test_AAAA_5_days_AAAA_6_days_no_aging(self):
self._test_A_and_AAAA_records(IPv6_ADDR, IPv6_ADDR_2, 5, 6, aging=False)
TestProgram(module=__name__, opts=subunitopts)