# Unix SMB/CIFS implementation. # Copyright (C) Kai Blin 2011 # Copyright (C) Catalyst.NET 2021 # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # import sys from samba import dsdb from samba import dsdb_dns from samba.ndr import ndr_unpack, ndr_pack from samba.samdb import SamDB from samba.auth import system_session import ldb from samba import credentials from samba.dcerpc import dns, dnsp, dnsserver from samba.dnsserver import TXTRecord, ARecord from samba.dnsserver import recbuf_from_string, ipv6_normalise from samba.tests.subunitrun import SubunitOptions, TestProgram from samba import werror, WERRORError from samba.tests.dns_base import DNSTest import samba.getopt as options import optparse import time from samba.colour import c_RED, c_GREEN, c_DARK_YELLOW parser = optparse.OptionParser( "dns_aging.py [options]") sambaopts = options.SambaOptions(parser) parser.add_option_group(sambaopts) # use command line creds if available credopts = options.CredentialsOptions(parser) parser.add_option_group(credopts) subunitopts = SubunitOptions(parser) parser.add_option_group(subunitopts) opts, args = parser.parse_args() if len(args) < 2: parser.print_usage() sys.exit(1) LP = sambaopts.get_loadparm() CREDS = credopts.get_credentials(LP) SERVER_NAME = args[0] SERVER_IP = args[1] CREDS.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE) DOMAIN = CREDS.get_realm().lower() # Unix time start, in DNS timestamp (24 * 365.25 * 369) # These are ballpark extremes for the timestamp. DNS_TIMESTAMP_1970 = 3234654 DNS_TIMESTAMP_2101 = 4383000 DNS_TIMESTAMP_1981 = 3333333 # a middling timestamp IPv4_ADDR = "127.0.0.33" IPv6_ADDR = "::1" IPv4_ADDR_2 = "127.0.0.66" IPv6_ADDR_2 = "1::1" def get_samdb(): return SamDB(url=f"ldap://{SERVER_IP}", lp=LP, session_info=system_session(), credentials=CREDS) def get_file_samdb(): # For Samba only direct file access, needed for the tombstoning functions. # (For Windows, we instruct it to tombstone over RPC). return SamDB(url=LP.samdb_url(), lp=LP, session_info=system_session(), credentials=CREDS) def get_rpc(): return dnsserver.dnsserver(f"ncacn_ip_tcp:{SERVER_IP}[sign]", LP, CREDS) def create_zone(name, rpc=None, aging=True): if rpc is None: rpc = get_rpc() z = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN() z.pszZoneName = name z.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY z.fAging = int(bool(aging)) z.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT z.fDsIntegrated = 1 z.fLoadExisting = 1 z.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE rpc.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, 0, SERVER_IP, None, 0, 'ZoneCreate', dnsserver.DNSSRV_TYPEID_ZONE_CREATE, z) def delete_zone(name, rpc=None): if rpc is None: rpc = get_rpc() rpc.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN, 0, SERVER_IP, name, 0, 'DeleteZoneFromDs', dnsserver.DNSSRV_TYPEID_NULL, None) def txt_s_list(txt): """Construct a txt record string list, which is a fiddly matter.""" if isinstance(txt, str): txt = [txt] s_list = dnsp.string_list() s_list.count = len(txt) s_list.str = txt return s_list def make_txt_record(txt): r = dns.txt_record() r.txt = txt_s_list(txt) return r def copy_rec(rec): copy = dnsserver.DNS_RPC_RECORD() copy.wType = rec.wType copy.dwFlags = rec.dwFlags copy.dwSerial = rec.dwSerial copy.dwTtlSeconds = rec.dwTtlSeconds copy.data = rec.data copy.dwTimeStamp = rec.dwTimeStamp return copy def guess_wtype(data): if isinstance(data, list): data = make_txt_record(data) return (data, dnsp.DNS_TYPE_TXT) if ":" in data: return (data, dnsp.DNS_TYPE_AAAA) return (data, dnsp.DNS_TYPE_A) class TestDNSAging(DNSTest): """Probe DNS aging and scavenging, using LDAP and RPC to set and test the timestamps behind DNS's back.""" server = SERVER_NAME server_ip = SERVER_IP creds = CREDS def setUp(self): super().setUp() self.rpc_conn = get_rpc() self.samdb = get_samdb() # We always have a zone of our own named after the test function. self.zone = self.id().rsplit('.', 1)[1] self.addCleanup(delete_zone, self.zone, self.rpc_conn) try: create_zone(self.zone, self.rpc_conn) except WERRORError as e: if e.args[0] != werror.WERR_DNS_ERROR_ZONE_ALREADY_EXISTS: raise print(f"zone {self.zone} already exists") # Though we set this in create_zone(), that doesn't work on # Windows, so we repeat again here. self.set_zone_int_params(AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE) self.zone_dn = (f"DC={self.zone},CN=MicrosoftDNS,DC=DomainDNSZones," f"{self.samdb.get_default_basedn()}") def set_zone_int_params(self, zone=None, **kwargs): """Keyword arguments set parameters on the zone. e.g.: self.set_zone_int_params(Aging=1, RefreshInterval=222) See [MS-DNSP] 3.1.1.2.1 "DNS Zone Integer Properties" for names. """ if zone is None: zone = self.zone for key, val in kwargs.items(): name_param = dnsserver.DNS_RPC_NAME_AND_PARAM() name_param.dwParam = val name_param.pszNodeName = key try: self.rpc_conn.DnssrvOperation2( dnsserver.DNS_CLIENT_VERSION_LONGHORN, 0, SERVER_IP, zone, 0, 'ResetDwordProperty', dnsserver.DNSSRV_TYPEID_NAME_AND_PARAM, name_param) except WERRORError as e: self.fail(str(e)) def rpc_replace(self, name, old=None, new=None): """Replace a DNS_RPC_RECORD or DNS_RPC_RECORD_BUF""" # wrap our recs, if necessary if isinstance(new, dnsserver.DNS_RPC_RECORD): rec = new new = dnsserver.DNS_RPC_RECORD_BUF() new.rec = rec if isinstance(old, dnsserver.DNS_RPC_RECORD): rec = old old = dnsserver.DNS_RPC_RECORD_BUF() old.rec = rec try: self.rpc_conn.DnssrvUpdateRecord2( dnsserver.DNS_CLIENT_VERSION_LONGHORN, 0, SERVER_IP, self.zone, name, new, old) except WERRORError as e: self.fail(f"could not replace record ({e})") def get_unique_txt_record(self, name, txt): """Get the TXT record on Name with value txt, asserting that there is only one.""" if isinstance(txt, str): txt = [txt] recs = self.ldap_get_records(name) match = None for r in recs: if r.wType != dnsp.DNS_TYPE_TXT: continue txt2 = [x for x in r.data.str] if txt2 == txt: self.assertIsNone(match) match = r return match def get_unique_ip_record(self, name, addr, wtype=None): """Get an A or AAAA record on name with the matching data.""" if wtype is None: addr, wtype = guess_wtype(addr) recs = self.ldap_get_records(name) # We need to use the internal dns_record_match because not all # forms always match on strings (e.g. IPv6) rec = dnsp.DnssrvRpcRecord() rec.wType = wtype rec.data = addr match = None for r in recs: if dsdb_dns.records_match(r, rec): self.assertIsNone(match) match = r return match def dns_query(self, name, qtype=dns.DNS_QTYPE_ALL): """make a query, which might help Windows notice LDAP changes""" p = self.make_name_packet(dns.DNS_OPCODE_QUERY) fullname = "%s.%s" % (name, self.zone) q = self.make_name_question(fullname, qtype, dns.DNS_QCLASS_IN) self.finish_name_packet(p, [q]) r, rp = self.dns_transaction_udp(p, host=SERVER_IP) return r def dns_update_non_text(self, name, data, wtype=None, qclass=dns.DNS_QCLASS_IN): if wtype is None: data, wtype = guess_wtype(data) if qclass == dns.DNS_QCLASS_IN: ttl = 123 else: ttl = 0 fullname = "%s.%s" % (name, self.zone) p = self.make_name_packet(dns.DNS_OPCODE_UPDATE) u = self.make_name_question(self.zone, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN) self.finish_name_packet(p, [u]) r = dns.res_rec() r.name = fullname r.rr_type = wtype r.rr_class = qclass r.ttl = ttl if data is not None: r.length = 0xffff r.rdata = data else: r.length = 0 p.nscount = 1 p.nsrecs = [r] (code, response) = self.dns_transaction_udp(p, host=SERVER_IP) self.assert_dns_rcode_equals(code, dns.DNS_RCODE_OK) return response def dns_delete(self, name, data, wtype=None): return self.dns_update_non_text(name, data, wtype, qclass=dns.DNS_QCLASS_NONE) def dns_delete_type(self, name, wtype): return self.dns_update_non_text(name, None, wtype, qclass=dns.DNS_QCLASS_ANY) def dns_update_record(self, name, txt, ttl=900): if isinstance(txt, str): txt = [txt] p = self.make_txt_update(name, txt, self.zone, ttl=ttl) (code, response) = self.dns_transaction_udp(p, host=SERVER_IP) if code.operation & dns.DNS_RCODE == dns.DNS_RCODE_REFUSED: # sometimes you might forget this print("\n\ngot DNS_RCODE_REFUSED\n") print("Are you running this in the fl2003 environment?\n") print("try `SELFTEST_TESTENV='fl2003dc:local' make testenv`\n\n") self.assert_dns_rcode_equals(code, dns.DNS_RCODE_OK) return self.get_unique_txt_record(name, txt) def rpc_update_record(self, name, txt, **kwargs): """Add the record that self.dns_update_record() would add, via the dnsserver RPC pipe. As with DNS update, if the record already exists, we replace it. """ if isinstance(txt, str): txt = [txt] old = TXTRecord(txt) rec = TXTRecord(txt) for k, v in kwargs.items(): setattr(rec, k, v) try: self.rpc_replace(name, old, rec) except AssertionError as e: # we have caught and wrapped the WERRor inside if 'WERR_DNS_ERROR_RECORD_DOES_NOT_EXIST' not in str(e): raise self.rpc_replace(name, None, rec) return self.get_unique_txt_record(name, txt) def rpc_delete_txt(self, name, txt): if isinstance(txt, str): txt = [txt] old = TXTRecord(txt) self.rpc_replace(name, old, None) def get_one_node(self, name): expr = f"(&(objectClass=dnsNode)(name={name}))" nodes = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_SUBTREE, expression=expr, attrs=["dnsRecord", "dNSTombstoned", "name"]) if len(nodes) > 1: self.fail( f"expected 0 or 1 dnsNodes for {name}, found {len(nodes)}") if len(nodes) == 0: return None return nodes[0] def ldap_get_records(self, name): node = self.get_one_node(name) if node is None: return [] records = node.get('dnsRecord') return [ndr_unpack(dnsp.DnssrvRpcRecord, r) for r in records] def ldap_get_non_tombstoned_records(self, name): all_records = self.ldap_get_records(name) records = [] for r in all_records: if r.wType != dnsp.DNS_TYPE_TOMBSTONE: records.append(r) return records def assert_tombstoned(self, name, tombstoned=True, timestamp=None): # If run with tombstoned=False, assert it isn't tombstoned # (and has no traces of tombstone). Otherwise assert it has # all the necessary bits. # # with timestamp=, we assert that # the nttime timestamp is about that time. # # with timestamp=None, we assert it is within a century or so. # # with timestamp=False (or 0), we don't assert on it. node = self.get_one_node(name) if node is None: self.fail(f"no node named {name}") dnsts = node.get("dNSTombstoned") if dnsts is None: is_tombstoned = False else: self.assertEqual(len(dnsts), 1) if dnsts[0] == b'TRUE': is_tombstoned = True else: is_tombstoned = False if tombstoned != is_tombstoned: if is_tombstoned: self.fail(f"{name} is tombstoned") else: self.fail(f"{name} is not tombstoned") recs = self.ldap_get_records(name) if is_tombstoned: self.assertEqual(len(recs), 1) self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TOMBSTONE) if timestamp is None: self.assert_nttime_in_hour_range(recs[0].data) elif timestamp: self.assert_nttime_in_hour_range(recs[0].data, timestamp - 3, timestamp + 3) else: for r in recs: self.assertNotEqual(recs[0].wType, dnsp.DNS_TYPE_TOMBSTONE) def ldap_replace_records(self, name, records): # We use raw ldap to avoid the "helpfulness" of dsdb_dns.replace() dn = f'DC={name},{self.zone_dn}' msg = ldb.Message.from_dict(self.samdb, {'dn': dn, 'dnsRecord': [ndr_pack(r) for r in records] }, ldb.FLAG_MOD_REPLACE) try: self.samdb.modify(msg) except ldb.LdbError as e: if 'LDAP_NO_SUCH_OBJECT' not in e.args[1]: raise # We need to do an add msg["objectClass"] = ["top", "dnsNode"] msg["dnsRecord"].set_flags(ldb.FLAG_MOD_ADD) self.samdb.add(msg) def ldap_update_core(self, name, wtype, data, **kwargs): """This one is not TXT specific.""" records = self.ldap_get_records(name) # default values rec = dnsp.DnssrvRpcRecord() rec.wType = wtype rec.rank = dnsp.DNS_RANK_ZONE rec.dwTtlSeconds = 900 rec.dwSerial = 110 rec.dwTimeStamp = 0 rec.data = data # override defaults, as required for k, v in kwargs.items(): setattr(rec, k, v) for i, r in enumerate(records[:]): if dsdb_dns.records_match(r, rec): records[i] = rec break else: # record not found records.append(rec) self.ldap_replace_records(name, records) return rec def ldap_update_record(self, name, txt, **kwargs): """Add the record that self.dns_update_record() would add, via ldap, thus allowing us to set additional dnsRecord features like dwTimestamp. """ rec = self.ldap_update_core(name, dnsp.DNS_TYPE_TXT, txt_s_list(txt), **kwargs) recs = self.ldap_get_records(name) match = None for r in recs: if r.wType != rec.wType: continue if r.data.str == rec.data.str: self.assertIsNone(match, f"duplicate records for {name}") match = r self.assertEqual(match.rank, rec.rank & 255) self.assertEqual(match.dwTtlSeconds, rec.dwTtlSeconds) self.assert_timestamps_equal(match.dwTimeStamp, rec.dwTimeStamp) return match def ldap_delete_record(self, name, data, wtype=dnsp.DNS_TYPE_TXT): rec = dnsp.DnssrvRpcRecord() if wtype == dnsp.DNS_TYPE_TXT: data = txt_s_list(data) rec.wType = wtype rec.data = data records = self.ldap_get_records(name) for i, r in enumerate(records[:]): if dsdb_dns.records_match(r, rec): del records[i] break else: self.fail(f"record {data} not found") self.ldap_replace_records(name, records) def add_ip_record(self, name, addr, wtype=None, **kwargs): if wtype is None: addr, wtype = guess_wtype(addr) rec = self.ldap_update_core(name, wtype, addr, **kwargs) recs = self.ldap_get_records(name) match = None for r in recs: if dsdb_dns.records_match(r, rec): self.assertIsNone(match, f"duplicate records for {name}") match = r self.assertEqual(match.rank, rec.rank & 255) self.assertEqual(match.dwTtlSeconds, rec.dwTtlSeconds) self.assert_timestamps_equal(match.dwTimeStamp, rec.dwTimeStamp) return match def ldap_modify_timestamps(self, name, delta): records = self.ldap_get_records(name) for rec in records: rec.dwTimeStamp += delta self.ldap_replace_records(name, records) def get_rpc_records(self, name, dns_type=None): if dns_type is None: dns_type = dnsp.DNS_TYPE_ALL select_flags = dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA buflen, res = self.rpc_conn.DnssrvEnumRecords2( dnsserver.DNS_CLIENT_VERSION_LONGHORN, 0, SERVER_IP, self.zone, name, None, dns_type, select_flags, None, None) recs = [] if not res or res.count == 0: return [] for rec in res.rec: recs.extend(rec.records) return recs def dns_tombstone(self, name, epoch_hours=DNS_TIMESTAMP_1981, epoch_nttime=None): dn = f'DC={name},{self.zone_dn}' r = dnsp.DnssrvRpcRecord() r.wType = dnsp.DNS_TYPE_TOMBSTONE # r.dwTimeStamp is a 32 bit value in hours, and r.data is an # NTTIME (100 nanosecond intervals), both in the 1601 epoch. A # tombstome will have both, but expiration calculations use # the r.data NTTIME EntombedTime timestamp (see [MS-DNSP]). r.dwTimeStamp = epoch_hours if epoch_nttime is None: r.data = epoch_hours * 3600 * 10 * 1000 * 1000 else: r.data = epoch_nttime msg = ldb.Message.from_dict(self.samdb, {'dn': dn, 'dnsRecord': [ndr_pack(r)], 'dnsTombstoned': 'TRUE' }, ldb.FLAG_MOD_REPLACE) try: self.samdb.modify(msg) except ldb.LdbError as e: if 'LDAP_NO_SUCH_OBJECT' not in e.args[1]: raise # We need to do an add msg["objectClass"] = ["top", "dnsNode"] self.samdb.add(msg) def set_aging(self, enable=False): self.set_zone_int_params(Aging=int(bool(enable))) def assert_timestamp_in_ballpark(self, rec): self.assertGreater(rec.dwTimeStamp, DNS_TIMESTAMP_1970) self.assertLess(rec.dwTimeStamp, DNS_TIMESTAMP_2101) def assert_nttime_in_hour_range(self, t, hour_min=DNS_TIMESTAMP_1970, hour_max=DNS_TIMESTAMP_2101): t //= int(3600 * 1e7) self.assertGreater(t, hour_min) self.assertLess(t, hour_max) def assert_soon_after(self, timestamp, reference): """Assert that a timestamp is the same or very slightly higher than a reference timestamp. Typically we expect the timestamps to be identical, unless an hour has clicked over since the reference was taken. However we allow one more hour in case it happens during a daylight savings transition or something. """ if hasattr(timestamp, 'dwTimeStamp'): timestamp = timestamp.dwTimeStamp if hasattr(reference, 'dwTimeStamp'): reference = reference.dwTimeStamp diff = timestamp - reference days = abs(diff / 24.0) if diff < 0: msg = f"timestamp is {days} days ({abs(diff)} hours) before reference" elif diff > 2: msg = f"timestamp is {days} days ({diff} hours) after reference" else: return raise AssertionError(msg) def assert_timestamps_equal(self, ts1, ts2): """Just like assertEqual(), but tells us the difference, not the absolute values. e.g: self.assertEqual(a, b) AssertionError: 3685491 != 3685371 self.assert_timestamps_equal(a, b) AssertionError: -120 (first is 5.0 days earlier than second) Also, we turn a record into a timestamp if we need """ if hasattr(ts1, 'dwTimeStamp'): ts1 = ts1.dwTimeStamp if hasattr(ts2, 'dwTimeStamp'): ts2 = ts2.dwTimeStamp if ts1 == ts2: return diff = ts1 - ts2 days = abs(diff / 24.0) if ts1 == 0 or ts2 == 0: # when comparing to zero we don't want the number of days. msg = f"timestamp {ts1} != {ts2}" elif diff > 0: msg = f"{ts1} is {days} days ({diff} hours) after {ts2}" else: msg = f"{ts1} is {days} days ({abs(diff)} hours) before {ts2}" raise AssertionError(msg) def test_update_timestamps_aging_off_then_on(self): # we will add a record with aging off # it will have the current timestamp self.set_aging(False) name = 'timestamp-now' name2 = 'timestamp-eightdays' rec = self.dns_update_record(name, [name]) start_time = rec.dwTimeStamp self.assert_timestamp_in_ballpark(rec) # alter the timestamp -8 days using RPC # with aging turned off, we expect no change # when aging is on, we expect change eight_days_ago = start_time - 8 * 24 rec = self.ldap_update_record(name2, [name2], dwTimeStamp=eight_days_ago) self.assert_timestamps_equal(rec.dwTimeStamp, eight_days_ago) # if aging was on, this would change rec = self.dns_update_record(name2, [name2]) self.assert_timestamps_equal(rec.dwTimeStamp, eight_days_ago) self.set_aging(True) rec = self.dns_update_record(name2, [name2]) self.assertGreaterEqual(rec.dwTimeStamp, start_time) def test_rpc_update_timestamps(self): # RPC always sets timestamps to zero on Windows. self.set_aging(False) name = 'timestamp-now' rec = self.dns_update_record(name, [name]) start_time = rec.dwTimeStamp self.assert_timestamp_in_ballpark(rec) # attempt to alter the timestamp to something close by. eight_days_ago = start_time - 8 * 24 rec = self.rpc_update_record(name, [name], dwTimeStamp=eight_days_ago) self.assertEqual(rec.dwTimeStamp, 0) # try again, with aging on self.set_aging(True) rec = self.rpc_update_record(name, [name], dwTimeStamp=eight_days_ago) self.assertEqual(rec.dwTimeStamp, 0) # now that the record is static, a dns update won't change it rec = self.dns_update_record(name, [name]) self.assertEqual(rec.dwTimeStamp, 0) # but another record on the same node will behave normally # i.e. the node is not static, the record is. name2 = 'timestamp-eightdays' rec = self.dns_update_record(name2, [name2]) self.assert_soon_after(rec.dwTimeStamp, start_time) def get_txt_timestamps(self, name, *txts): records = self.ldap_get_records(name) ret = [] for t in txts: for r in records: t2 = [x for x in r.data.str] if t == t2: ret.append(r.dwTimeStamp) return ret def test_update_aging_disabled_2(self): # With aging disabled, Windows updates the timestamps of all # records when one is updated. name = 'test' txt1 = ['test txt'] txt2 = ['test', 'txt2'] txt3 = ['test', 'txt3'] self.set_aging(False) current_time = self.dns_update_record(name, txt1).dwTimeStamp six_days_ago = current_time - 6 * 24 eight_days_ago = current_time - 8 * 24 fifteen_days_ago = current_time - 15 * 24 hundred_days_ago = current_time - 100 * 24 thousand_days_ago = current_time - 1000 * 24 for timestamp in (current_time, six_days_ago, eight_days_ago, fifteen_days_ago, hundred_days_ago, thousand_days_ago): # wind back self.ldap_update_record(name, txt1, dwTimeStamp=timestamp) self.assertEqual(self.get_txt_timestamps(name, txt1), [timestamp]) # no change here update_timestamp = self.dns_update_record(name, txt1).dwTimeStamp self.assert_timestamps_equal(update_timestamp, timestamp) # adding a fresh record for timestamp in (current_time, six_days_ago, eight_days_ago, fifteen_days_ago, hundred_days_ago, thousand_days_ago, 100000, 100): # wind back timestamp1 = self.ldap_update_record( name, txt1, dwTimeStamp=timestamp).dwTimeStamp self.assert_timestamps_equal(timestamp1, timestamp) self.dns_update_record(name, txt2) timestamps = self.get_txt_timestamps(name, txt1, txt2) self.assertEqual(timestamps, [timestamp, current_time]) self.ldap_delete_record(name, txt2) timestamps = self.get_txt_timestamps(name, txt1) self.assertEqual(timestamps, [timestamp]) # add record 2. timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp self.assert_soon_after(timestamp2, current_time) for timestamp in (current_time, six_days_ago, eight_days_ago, fifteen_days_ago, hundred_days_ago, thousand_days_ago, 100000, 100): # wind back self.ldap_update_record(name, txt1, dwTimeStamp=timestamp) timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp self.assert_timestamps_equal(timestamp1, timestamp) timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp # txt1 timestamp is now current time timestamps = self.get_txt_timestamps(name, txt1, txt2) self.assertEqual(timestamps, [timestamp, current_time]) # with 3 records, no change for timestamp in (current_time, six_days_ago, eight_days_ago, fifteen_days_ago, hundred_days_ago, thousand_days_ago, 100000, 10): # wind back self.ldap_update_record(name, txt1, dwTimeStamp=timestamp) self.ldap_update_record(name, txt2, dwTimeStamp=timestamp) self.ldap_update_record(name, txt3, dwTimeStamp=(timestamp + 30)) timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp self.assert_timestamps_equal(timestamp3, timestamp + 30) self.dns_update_record(name, txt2).dwTimeStamp timestamps = self.get_txt_timestamps(name, txt1, txt2, txt3) self.assertEqual(timestamps, [timestamp, timestamp, timestamp + 30]) # with 3 records, one of which is static # first we set the updatee's timestamp to a recognisable number self.ldap_update_record(name, txt2, dwTimeStamp=999999) for timestamp in (current_time, six_days_ago, eight_days_ago, fifteen_days_ago, hundred_days_ago, thousand_days_ago, 100000, 10): # wind back self.ldap_update_record(name, txt1, dwTimeStamp=0) self.ldap_update_record(name, txt3, dwTimeStamp=(timestamp - 9)) timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp self.assert_timestamps_equal(timestamp3, timestamp - 9) self.dns_update_record(name, txt2) timestamps = self.get_txt_timestamps(name, txt1, txt2, txt3) self.assertEqual(timestamps, [0, 999999, timestamp - 9]) # with 3 records, updating one which is static timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp for timestamp in (current_time, six_days_ago, eight_days_ago, fifteen_days_ago, hundred_days_ago, thousand_days_ago, 100000, 10): # wind back self.ldap_update_record(name, txt1, dwTimeStamp=0) self.ldap_update_record(name, txt2, dwTimeStamp=0) self.ldap_update_record(name, txt3, dwTimeStamp=(timestamp + 30)) timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp self.assert_timestamps_equal(timestamp3, timestamp + 30) self.dns_update_record(name, txt2).dwTimeStamp timestamps = self.get_txt_timestamps(name, txt1, txt2, txt3) self.assertEqual(timestamps, [0, 0, timestamp + 30]) # with 3 records, after the static nodes have been replaced self.ldap_update_record(name, txt1, dwTimeStamp=777777) self.ldap_update_record(name, txt2, dwTimeStamp=888888) timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp for timestamp in (current_time, six_days_ago, eight_days_ago, fifteen_days_ago, hundred_days_ago, thousand_days_ago, 100000, 10): # wind back self.ldap_update_record(name, txt3, dwTimeStamp=(timestamp)) timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp self.assert_timestamps_equal(timestamp3, timestamp) self.dns_update_record(name, txt2) timestamps = self.get_txt_timestamps(name, txt1, txt2, txt3) self.assertEqual(timestamps, [777777, 888888, timestamp]) def _test_update_aging_disabled_n_days_ago(self, n_days): name = 'test' txt1 = ['1'] txt2 = ['2'] self.set_aging(False) current_time = self.dns_update_record(name, txt1).dwTimeStamp # rewind timestamp using ldap self.ldap_modify_timestamps(name, n_days * -24) n_days_ago = self.get_unique_txt_record(name, txt1).dwTimeStamp self.assertGreater(current_time, n_days_ago) # no change when updating this record update_timestamp = self.dns_update_record(name, txt1).dwTimeStamp self.assert_timestamps_equal(update_timestamp, n_days_ago) # add another record, which should have the current timestamp timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp self.assert_soon_after(timestamp2, current_time) # get the original record timestamp. NOW it matches current_time timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp self.assert_timestamps_equal(timestamp1, timestamp2) # let's repeat that, this time with txt2 existing self.ldap_update_record(name, txt1, dwTimeStamp=n_days_ago) timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp self.assert_timestamps_equal(timestamp1, n_days_ago) # this update is not an add timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp self.assert_soon_after(timestamp2, current_time) # now timestamp1 is not changed timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp self.assert_timestamps_equal(timestamp1, n_days_ago) # delete record2, try again self.ldap_delete_record(name, txt2) self.ldap_update_record(name, txt1, dwTimeStamp=n_days_ago) timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp self.assert_timestamps_equal(timestamp1, n_days_ago) # here we are re-adding the deleted record timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp self.assert_soon_after(timestamp2, current_time) timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp # It gets weird HERE. # note how the SIBLING of the deleted, re-added record differs # from the sibling of freshly added record, depending on the # time difference. if n_days <= 7: self.assert_timestamps_equal(timestamp1, n_days_ago) else: self.assert_timestamps_equal(timestamp1, timestamp2) # re-timestamp record2, try again self.ldap_update_record(name, txt2, dwTimeStamp=n_days_ago) self.ldap_update_record(name, txt1, dwTimeStamp=n_days_ago) timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp self.assert_timestamps_equal(timestamp1, n_days_ago) # no change timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp self.assert_timestamps_equal(timestamp2, n_days_ago) # also no change timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp self.assert_timestamps_equal(timestamp1, timestamp2) # let's introduce another record txt3 = ['3'] self.ldap_update_record(name, txt2, dwTimeStamp=n_days_ago) self.ldap_update_record(name, txt1, dwTimeStamp=n_days_ago) timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp self.assert_soon_after(timestamp3, current_time) timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp if n_days <= 7: self.assert_timestamps_equal(timestamp1, n_days_ago) else: self.assert_timestamps_equal(timestamp1, timestamp3) self.assert_timestamps_equal(timestamp2, timestamp3) self.ldap_delete_record(name, txt3) timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp self.assert_soon_after(timestamp3, current_time) timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp if n_days <= 7: self.assert_timestamps_equal(timestamp1, n_days_ago) else: self.assert_timestamps_equal(timestamp1, timestamp3) self.assert_timestamps_equal(timestamp2, timestamp3) # and here we'll make txt3 static txt4 = ['4'] # and here we'll make txt1 static self.ldap_update_record(name, txt1, dwTimeStamp=0) self.ldap_update_record(name, txt2, dwTimeStamp=n_days_ago) self.ldap_update_record(name, txt3, dwTimeStamp=n_days_ago) timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp timestamp4 = self.dns_update_record(name, txt4).dwTimeStamp self.assertEqual(timestamp1, 0) self.assert_timestamps_equal(timestamp2, n_days_ago) self.assert_timestamps_equal(timestamp3, n_days_ago) self.assert_soon_after(timestamp4, current_time) def test_update_aging_disabled_in_no_refresh_window(self): self._test_update_aging_disabled_n_days_ago(4) def test_update_aging_disabled_on_no_refresh_boundary(self): self._test_update_aging_disabled_n_days_ago(7) def test_update_aging_disabled_in_refresh_window(self): self._test_update_aging_disabled_n_days_ago(9) def test_update_aging_disabled_beyond_refresh_window(self): self._test_update_aging_disabled_n_days_ago(16) def test_update_aging_disabled_in_eighteenth_century(self): self._test_update_aging_disabled_n_days_ago(100000) def test_update_aging_disabled_static(self): name = 'test' txt1 = ['1'] txt2 = ['2'] self.set_aging(False) current_time = self.dns_update_record(name, txt1).dwTimeStamp self.ldap_update_record(name, txt1, dwTimeStamp=0) # no change when updating this record timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp self.assertEqual(timestamp1, 0) # add another record, which should have the current timestamp timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp self.assert_soon_after(timestamp2, current_time) timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp self.assert_soon_after(timestamp1, current_time) # let's repeat that, this time with txt2 existing timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp self.assert_soon_after(timestamp2, current_time) timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp self.assert_soon_after(timestamp2, current_time) # delete record2, try again self.ldap_delete_record(name, txt2) self.ldap_update_record(name, txt1, dwTimeStamp=0) # no change when updating this record timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp self.assertEqual(timestamp1, 0) timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp self.assertEqual(timestamp2, 0) timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp self.assertEqual(timestamp1, 0) # re-timestamp record2, try again self.ldap_update_record(name, txt2, dwTimeStamp=1) self.ldap_update_record(name, txt1, dwTimeStamp=0) # no change when updating this record timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp self.assert_timestamps_equal(timestamp2, 1) def test_update_aging_disabled(self): # With aging disabled, Windows updates the timestamps of all # records when one is updated. name = 'test' txt1 = ['test txt'] txt2 = ['test', 'txt2'] txt3 = ['test', 'txt3'] minus_6 = -6 * 24 minus_8 = -8 * 24 self.set_aging(False) current_time = self.dns_update_record(name, txt1).dwTimeStamp # rewind timestamp using ldap self.ldap_modify_timestamps(name, minus_6) after_mod = self.get_unique_txt_record(name, txt1) six_days_ago = after_mod.dwTimeStamp self.assert_timestamps_equal(six_days_ago, current_time + minus_6) # no change update_timestamp = self.dns_update_record(name, txt1).dwTimeStamp self.assert_timestamps_equal(update_timestamp, six_days_ago) self.check_query_txt(name, txt1, zone=self.zone) # another record timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp self.assert_soon_after(timestamp2, current_time) timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp # without aging, timestamp1 is changed!! self.assert_timestamps_equal(timestamp1, timestamp2) # Set both records back to 8 days ago. self.ldap_modify_timestamps(name, minus_8) eight_days_ago = self.get_unique_txt_record(name, txt1).dwTimeStamp self.assert_timestamps_equal(eight_days_ago, current_time + minus_8) update2 = self.dns_update_record(name, txt2) # Without aging on, an update should not change the timestamps. self.assert_timestamps_equal(update2.dwTimeStamp, eight_days_ago) timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp self.assert_timestamps_equal(timestamp1, eight_days_ago) # Add another txt record. The new record should have the now # timestamp, and drag the others up with it. timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp self.assert_soon_after(timestamp3, current_time) timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp self.assert_timestamps_equal(timestamp1, timestamp3) self.assert_timestamps_equal(timestamp2, timestamp3) hundred_days_ago = current_time - 100 * 24 thousand_days_ago = current_time - 1000 * 24 record = self.ldap_update_record(name, txt1, dwTimeStamp=hundred_days_ago) self.assert_timestamps_equal(record.dwTimeStamp, hundred_days_ago) record = self.ldap_update_record(name, txt2, dwTimeStamp=thousand_days_ago) self.assert_timestamps_equal(record.dwTimeStamp, thousand_days_ago) # update 3, will others change (because beyond RefreshInterval)? yes. timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp self.assert_soon_after(timestamp3, current_time) self.assert_timestamps_equal(timestamp1, hundred_days_ago) self.assert_timestamps_equal(timestamp2, thousand_days_ago) fifteen_days_ago = current_time - 15 * 24 self.ldap_update_record(name, txt3, dwTimeStamp=fifteen_days_ago) timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp # DNS update has no effect because all records are old self.assert_timestamps_equal(timestamp2, thousand_days_ago) self.assert_timestamps_equal(timestamp1, hundred_days_ago) self.assert_timestamps_equal(timestamp3, fifteen_days_ago) # Does update of old record affect timestamp of refreshable record? No. self.ldap_update_record(name, txt3, dwTimeStamp=eight_days_ago) timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp # DNS update has no effect because all records are old self.assert_timestamps_equal(timestamp2, thousand_days_ago) self.assert_timestamps_equal(timestamp1, hundred_days_ago) self.assert_timestamps_equal(timestamp3, eight_days_ago) # RPC zeros timestamp, after which updates won't change it. # BUT it refreshes all others! self.rpc_update_record(name, txt2) timestamp2 = self.dns_update_record(name, txt3).dwTimeStamp timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp self.assertEqual(timestamp2, 0) self.assert_soon_after(timestamp1, current_time) self.assert_timestamps_equal(timestamp3, eight_days_ago) def test_update_aging_enabled(self): name = 'test' txt1 = ['test txt'] txt2 = ['test', 'txt2'] txt3 = ['test', 'txt3'] txt4 = ['4'] self.set_aging(True) current_time = self.dns_update_record(name, txt2).dwTimeStamp six_days_ago = current_time - 6 * 24 eight_days_ago = current_time - 8 * 24 fifteen_days_ago = current_time - 15 * 24 hundred_days_ago = current_time - 100 * 24 self.ldap_update_record(name, txt1, dwTimeStamp=six_days_ago) # with or without aging, a delta of -6 days does not affect # timestamps, because dwNoRefreshInterval is 7 days. timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp self.assert_timestamps_equal(timestamp1, six_days_ago) self.assert_soon_after(timestamp2, current_time) self.ldap_update_record(name, txt3, dwTimeStamp=eight_days_ago) timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp self.assert_timestamps_equal(timestamp3, eight_days_ago) # update 1, what happens to 2 and 3? Nothing? timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp self.assert_timestamps_equal(timestamp1, six_days_ago) self.assert_soon_after(timestamp2, current_time) self.assert_timestamps_equal(timestamp3, eight_days_ago) # now set 1 to 8 days, and we should see changes self.ldap_update_record(name, txt1, dwTimeStamp=eight_days_ago) # update 1, what happens to 2 and 3? Nothing? timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp self.assert_soon_after(timestamp1, current_time) self.assert_soon_after(timestamp2, current_time) self.assert_timestamps_equal(timestamp3, eight_days_ago) # next few ones use these numbers self.ldap_update_record(name, txt1, dwTimeStamp=fifteen_days_ago) self.ldap_update_record(name, txt2, dwTimeStamp=six_days_ago) self.ldap_update_record(name, txt3, dwTimeStamp=eight_days_ago) # change even though 1 is outside the window timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp self.assert_soon_after(timestamp1, current_time) self.assert_timestamps_equal(timestamp2, six_days_ago) self.assert_timestamps_equal(timestamp3, eight_days_ago) # reset 1 self.ldap_update_record(name, txt1, dwTimeStamp=fifteen_days_ago) # no change, because 2 is outside the window timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp self.assert_timestamps_equal(timestamp1, fifteen_days_ago) self.assert_timestamps_equal(timestamp2, six_days_ago) self.assert_timestamps_equal(timestamp3, eight_days_ago) # 3 changes, others do not timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp self.assert_timestamps_equal(timestamp1, fifteen_days_ago) self.assert_timestamps_equal(timestamp2, six_days_ago) self.assert_soon_after(timestamp3, current_time) # reset 3 to 100 days self.ldap_update_record(name, txt3, dwTimeStamp=hundred_days_ago) # 3 changes, others do not timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp self.assert_timestamps_equal(timestamp1, fifteen_days_ago) self.assert_timestamps_equal(timestamp2, six_days_ago) self.assert_soon_after(timestamp3, current_time) # reset 1 and 3 to 8 days. does update of 1 affect 3? self.ldap_update_record(name, txt1, dwTimeStamp=eight_days_ago) self.ldap_update_record(name, txt3, dwTimeStamp=eight_days_ago) # 1 changes, others do not timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp self.assert_soon_after(timestamp1, current_time) self.assert_timestamps_equal(timestamp2, six_days_ago) self.assert_timestamps_equal(timestamp3, eight_days_ago) # Try an RPC update, zeroing 1 --> what happens to 3? timestamp1 = self.rpc_update_record(name, txt1).dwTimeStamp timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp self.assertEqual(timestamp1, 0) self.assert_timestamps_equal(timestamp2, six_days_ago) self.assert_timestamps_equal(timestamp3, eight_days_ago) # with 2 and 3 at 8 days, does static record change things? self.ldap_update_record(name, txt2, dwTimeStamp=eight_days_ago) # 2 changes, but to zero! timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp self.assert_timestamps_equal(timestamp1, 0) self.assert_timestamps_equal(timestamp2, 0) self.assert_timestamps_equal(timestamp3, eight_days_ago) self.ldap_update_record(name, txt2, dwTimeStamp=six_days_ago) self.ldap_update_record(name, txt1, dwTimeStamp=3000000) timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp self.assert_timestamps_equal(timestamp1, 3000000) # dns update remembers that node is static, even with no # static records. timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp self.assertEqual(timestamp1, 0) # Add another txt record. The new record should have the now # timestamp, and the others should remain unchanged. # BUT somehow record 1 is static!? timestamp4 = self.dns_update_record(name, txt4).dwTimeStamp timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp self.assert_timestamps_equal(timestamp1, 0) self.assert_timestamps_equal(timestamp2, six_days_ago) self.assert_timestamps_equal(timestamp3, eight_days_ago) self.assert_timestamps_equal(timestamp4, 0) def _test_update_aging_enabled_n_days_ago(self, n_days): name = 'test' txt1 = ['1'] txt2 = ['2'] delta = n_days * -24 self.set_aging(True) current_time = self.dns_update_record(name, txt1).dwTimeStamp # rewind timestamp using ldap self.ldap_modify_timestamps(name, delta) n_days_ago = self.get_unique_txt_record(name, txt1).dwTimeStamp self.assertGreater(current_time, n_days_ago) # update changes timestamp depending on time. timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp if n_days <= 7: self.assert_timestamps_equal(timestamp1, n_days_ago) else: self.assert_soon_after(timestamp1, current_time) # add another record, which should have the current timestamp timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp self.assert_soon_after(timestamp2, current_time) # first record should not have changed timestamp1_b = self.get_unique_txt_record(name, txt1).dwTimeStamp self.assert_timestamps_equal(timestamp1, timestamp1_b) # let's repeat that, this time with txt2 existing self.ldap_update_record(name, txt1, dwTimeStamp=n_days_ago) timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp self.assert_timestamps_equal(timestamp1, timestamp1_b) # this update is not an add. record 2 is already up-to-date timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp self.assert_soon_after(timestamp2, current_time) # now timestamp1 is not changed timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp self.assert_timestamps_equal(timestamp1, timestamp1_b) # delete record2, try again self.ldap_delete_record(name, txt2) self.ldap_update_record(name, txt1, dwTimeStamp=n_days_ago) timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp if n_days <= 7: self.assert_timestamps_equal(timestamp1, n_days_ago) else: self.assert_soon_after(timestamp1, current_time) # here we are re-adding the deleted record timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp self.assert_soon_after(timestamp2, current_time) timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp # It gets weird HERE. # note how the SIBLING of the deleted, re-added record differs # from the sibling of freshly added record, depending on the # time difference. if n_days <= 7: self.assert_timestamps_equal(timestamp1, n_days_ago) else: self.assert_timestamps_equal(timestamp1, timestamp2) # re-timestamp record2, try again self.ldap_update_record(name, txt2, dwTimeStamp=n_days_ago) self.ldap_update_record(name, txt1, dwTimeStamp=n_days_ago) # this should make no difference timestamp1_b = self.dns_update_record(name, txt1).dwTimeStamp self.assert_timestamps_equal(timestamp1, timestamp1_b) # no change timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp self.assert_timestamps_equal(timestamp2, timestamp1) # also no change timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp self.assert_timestamps_equal(timestamp1, timestamp2) # let's introduce another record txt3 = ['3'] self.ldap_update_record(name, txt2, dwTimeStamp=n_days_ago) self.ldap_update_record(name, txt1, dwTimeStamp=n_days_ago) timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp self.assert_soon_after(timestamp3, current_time) timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp self.assert_timestamps_equal(timestamp1, n_days_ago) self.assert_timestamps_equal(timestamp2, n_days_ago) self.ldap_delete_record(name, txt3) timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp self.assert_soon_after(timestamp3, current_time) timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp self.assert_timestamps_equal(timestamp1, n_days_ago) self.assert_timestamps_equal(timestamp2, n_days_ago) txt4 = ['4'] # Because txt1 is static, txt4 is static self.ldap_update_record(name, txt1, dwTimeStamp=0) self.ldap_update_record(name, txt2, dwTimeStamp=n_days_ago) self.ldap_update_record(name, txt3, dwTimeStamp=n_days_ago) timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp timestamp4 = self.dns_update_record(name, txt4).dwTimeStamp self.assert_timestamps_equal(timestamp1, 0) self.assert_timestamps_equal(timestamp2, n_days_ago) self.assert_timestamps_equal(timestamp3, n_days_ago) self.assert_timestamps_equal(timestamp4, 0) longer_ago = n_days_ago // 2 # remove all static records. self.ldap_delete_record(name, txt4) self.ldap_update_record(name, txt1, dwTimeStamp=longer_ago) self.ldap_update_record(name, txt2, dwTimeStamp=n_days_ago) self.ldap_update_record(name, txt3, dwTimeStamp=n_days_ago) timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp self.assert_timestamps_equal(timestamp1, longer_ago) timestamp4 = self.dns_update_record(name, txt4).dwTimeStamp timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp # Here, although there is no record frm which to get the zero # timestamp, record 4 does it anyway. self.assert_timestamps_equal(timestamp1, longer_ago) self.assert_timestamps_equal(timestamp2, n_days_ago) self.assert_timestamps_equal(timestamp3, n_days_ago) self.assert_timestamps_equal(timestamp4, 0) # and now record 1 wants to be static. self.ldap_update_record(name, txt4, dwTimeStamp=longer_ago) timestamp4 = self.get_unique_txt_record(name, txt4).dwTimeStamp self.assert_timestamps_equal(timestamp4, longer_ago) timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp timestamp4 = self.get_unique_txt_record(name, txt4).dwTimeStamp self.assert_timestamps_equal(timestamp1, 0) self.assert_timestamps_equal(timestamp4, longer_ago) def test_update_aging_enabled_in_no_refresh_window(self): self._test_update_aging_enabled_n_days_ago(4) def test_update_aging_enabled_on_no_refresh_boundary(self): self._test_update_aging_enabled_n_days_ago(7) def test_update_aging_enabled_in_refresh_window(self): self._test_update_aging_enabled_n_days_ago(9) def test_update_aging_enabled_beyond_refresh_window(self): self._test_update_aging_enabled_n_days_ago(16) def test_update_aging_enabled_in_eighteenth_century(self): self._test_update_aging_enabled_n_days_ago(100000) def test_update_static_stickiness(self): name = 'test' A = ['A'] B = ['B'] C = ['C'] D = ['D'] self.set_aging(False) self.dns_update_record(name, A).dwTimeStamp self.ldap_update_record(name, B, dwTimeStamp=0) self.dns_update_record(name, B) self.dns_update_record(name, C) ctime = self.get_unique_txt_record(name, C).dwTimeStamp self.assertEqual(ctime, 0) btime = self.get_unique_txt_record(name, B).dwTimeStamp self.assertEqual(btime, 0) self.ldap_replace_records(name, []) self.dns_update_record(name, D) dtime = self.get_unique_txt_record(name, D).dwTimeStamp self.assertEqual(dtime, 0) def _test_update_timestamp_weirdness(self, n_days, aging=True): name = 'test' A = ['A'] B = ['B'] self.set_aging(aging) current_time = self.dns_update_record(name, A).dwTimeStamp # rewind timestamp using ldap self.ldap_modify_timestamps(name, n_days * -24) n_days_ago = self.get_unique_txt_record(name, A).dwTimeStamp time_A = self.dns_update_record(name, A).dwTimeStamp # that dns_update should have reset the timestamp ONLY if # aging is on and the old timestamp is > noRefresh period (7 # days) if n_days > 7 and aging: self.assert_soon_after(time_A, current_time) else: self.assert_timestamps_equal(time_A, n_days_ago) # add another record, which should have the current timestamp time_B = self.dns_update_record(name, B).dwTimeStamp self.assert_soon_after(time_B, current_time) time_A = self.get_unique_txt_record(name, A).dwTimeStamp if aging and n_days <= 7: self.assert_timestamps_equal(time_A, n_days_ago) else: self.assert_soon_after(time_A, current_time) # delete B, try again self.ldap_delete_record(name, B) self.ldap_update_record(name, A, dwTimeStamp=n_days_ago) time_A = self.dns_update_record(name, A).dwTimeStamp # here we are re-adding the deleted record time_B = self.dns_update_record(name, B).dwTimeStamp self.assert_soon_after(time_B, current_time) time_A = self.get_unique_txt_record(name, A).dwTimeStamp return n_days_ago, time_A, time_B def test_update_timestamp_weirdness_no_refresh_no_aging(self): n_days_ago, time_A, time_B = \ self._test_update_timestamp_weirdness(5, False) # the timestamp of the SIBLING of the deleted, re-added record # differs from the sibling of freshly added record. self.assert_timestamps_equal(time_A, n_days_ago) def test_update_timestamp_weirdness_no_refresh_aging(self): n_days_ago, time_A, time_B = \ self._test_update_timestamp_weirdness(5, True) # the timestamp of the SIBLING of the deleted, re-added record # differs from the sibling of freshly added record. self.assert_timestamps_equal(time_A, n_days_ago) def test_update_timestamp_weirdness_refresh_no_aging(self): n_days_ago, time_A, time_B = \ self._test_update_timestamp_weirdness(9, False) self.assert_timestamps_equal(time_A, time_B) def test_update_timestamp_weirdness_refresh_aging(self): n_days_ago, time_A, time_B = \ self._test_update_timestamp_weirdness(9, True) self.assert_timestamps_equal(time_A, time_B) def test_aging_refresh(self): name, txt = 'agingtest', ['test txt'] no_refresh = 200 refresh = 160 self.set_zone_int_params(NoRefreshInterval=no_refresh, RefreshInterval=refresh, Aging=1) before_mod = self.dns_update_record(name, txt) start_time = before_mod.dwTimeStamp # go back 86 hours, which is in the no-refresh time (but # wouldn't be if we had stuck to the default of 168). self.ldap_modify_timestamps(name, -170) rec = self.dns_update_record(name, txt) self.assert_timestamps_equal(rec.dwTimeStamp, start_time - 170) # back to -202 hours, into the refresh zone # the update should reset the timestamp to now. self.ldap_modify_timestamps(name, -32) rec = self.dns_update_record(name, txt) self.assert_soon_after(rec.dwTimeStamp, start_time) # back to -362 hours, beyond the end of the refresh period. # Actually nothing changes at this time -- we can still # refresh, but the record is liable for scavenging. self.ldap_modify_timestamps(name, -160) rec = self.dns_update_record(name, txt) self.assert_soon_after(rec.dwTimeStamp, start_time) def test_add_no_timestamp(self): # check zero timestamp is implicit self.set_aging(True) rec = self.ldap_update_record('ldap', 'test') self.assertEqual(rec.dwTimeStamp, 0) rec = self.rpc_update_record('rpc', 'test') self.assertEqual(rec.dwTimeStamp, 0) def test_add_zero_timestamp(self): rec = self.ldap_update_record('ldap', 'test', dwTimeStamp=0) self.assertEqual(rec.dwTimeStamp, 0) rec = self.rpc_update_record('rpc', 'test', dwTimeStamp=0) self.assertEqual(rec.dwTimeStamp, 0) def test_add_update_timestamp(self): # LDAP can change timestamp, RPC can't rec = self.ldap_update_record('ldap', 'test', dwTimeStamp=123456) self.assertEqual(rec.dwTimeStamp, 123456) rec = self.rpc_update_record('rpc', 'test', dwTimeStamp=123456) self.assertEqual(rec.dwTimeStamp, 0) # second time is a different code path (add vs update) rec = self.rpc_update_record('rpc', 'test', dwTimeStamp=123456) self.assertEqual(rec.dwTimeStamp, 0) # RPC update the one with timestamp, zeroing it. rec = self.rpc_update_record('ldap', 'test', dwTimeStamp=123456) self.assertEqual(rec.dwTimeStamp, 0) def test_add_update_ttl(self): # RPC *can* set dwTtlSeconds. rec = self.ldap_update_record('ldap', 'test', dwTtlSeconds=1234) self.assertEqual(rec.dwTtlSeconds, 1234) rec = self.rpc_update_record('rpc', 'test', dwTtlSeconds=1234) self.assertEqual(rec.dwTtlSeconds, 1234) # does update work like add? rec = self.rpc_update_record('rpc', 'test', dwTtlSeconds=4321) self.assertEqual(rec.dwTtlSeconds, 4321) rec = self.rpc_update_record('ldap', 'test', dwTtlSeconds=5678) self.assertEqual(rec.dwTtlSeconds, 5678) def test_add_update_ttl_serial(self): # when setting dwTtlSeconds, what happens to serial number? rec = self.ldap_update_record('ldap', 'test', dwTtlSeconds=1234, dwSerial=123) self.assertEqual(rec.dwTtlSeconds, 1234) self.assertEqual(rec.dwSerial, 123) rec = self.rpc_update_record('rpc', 'test', dwTtlSeconds=1234) self.assertEqual(rec.dwTtlSeconds, 1234) serial = rec.dwSerial self.assertLess(serial, 4) rec = self.rpc_update_record('rpc', 'test', dwTtlSeconds=4321) self.assertEqual(rec.dwTtlSeconds, 4321) self.assertEqual(rec.dwSerial, serial + 1) rec = self.rpc_update_record('ldap', 'test', dwTtlSeconds=5678) self.assertEqual(rec.dwTtlSeconds, 5678) self.assertEqual(rec.dwSerial, 124) def test_add_update_dwFlags(self): # dwFlags splits into rank and flags. # according to [MS-DNSP] 2.3.2.2, flags MUST be zero rec = self.ldap_update_record('ldap', 'test', flags=22222, rank=222) self.assertEqual(rec.flags, 22222) self.assertEqual(rec.rank, 222) rec = self.rpc_update_record('ldap', 'test', dwFlags=3333333) # rank != 3333333 & 0xff == 213 self.assertEqual(rec.rank, 240) # RPC fixes rank self.assertEqual(rec.flags, 0) self.assertRaises(OverflowError, self.ldap_update_record, 'ldap', 'test', flags=777777777, rank=777) # reset to no default (rank overflows) rec = self.ldap_update_record('ldap', 'test', flags=7777, rank=777) self.assertEqual(rec.flags, 7777) self.assertEqual(rec.rank, 9) # DNS update zeros flags, sets rank to 240 (RANK_ZONE) rec = self.dns_update_record('ldap', 'test', ttl=999) self.assertEqual(rec.flags, 0) self.assertEqual(rec.rank, 240) rec = self.rpc_update_record('ldap', 'test', dwFlags=321) self.assertEqual(rec.flags, 0) self.assertEqual(rec.rank, 240) # RPC adding a new record: fixed rank, zero flags rec = self.rpc_update_record('ldap', 'test 2', dwFlags=12345) self.assertEqual(rec.rank, 240) self.assertEqual(rec.flags, 0) def test_add_update_dwReserved(self): # RPC does not change dwReserved. rec = self.ldap_update_record('ldap', 'test', dwReserved=54321) self.assertEqual(rec.dwReserved, 54321) rec = self.rpc_update_record('rpc', 'test', dwReserved=54321) self.assertEqual(rec.dwReserved, 0) rec = self.rpc_update_record('rpc', 'test', dwReserved=54321) self.assertEqual(rec.dwReserved, 0) rec = self.rpc_update_record('ldap', 'test', dwReserved=12345) self.assertEqual(rec.dwReserved, 54321) def test_add_update_dwSerial(self): # On Windows the RPC record ends up with serial 2, on Samba # serial 3. Rather than knownfail this, we accept anything # below 4 (for now). rec = self.ldap_update_record('ldap', 'test', dwSerial=123) self.assertEqual(rec.dwSerial, 123) rec = self.rpc_update_record('rpc', 'test', dwSerial=123) self.assertLess(rec.dwSerial, 4) rec = self.rpc_update_record('rpc', 'test', dwSerial=123) self.assertLess(rec.dwSerial, 4) rec = self.dns_update_record('rpc', 'test') self.assertLess(rec.dwSerial, 4) rec = self.dns_update_record('dns-0', 'test') self.assertLess(rec.dwSerial, 5) rec = self.dns_update_record('ldap', 'test') self.assertEqual(rec.dwSerial, 123) rec = self.rpc_update_record('ldap', 'test', dwSerial=123) self.assertEqual(rec.dwSerial, 123) rec = self.ldap_update_record('ldap', 'test', dwSerial=12) self.assertEqual(rec.dwSerial, 12) # when we dns-updated ldap/test, we alerted Windows to 123 as # a high water mark for the zone. (even though we have since # dropped the serial to 12, 123 is the base serial for new # records). rec = self.dns_update_record('dns', 'test') self.assertEqual(rec.dwSerial, 124) rec = self.dns_update_record('dns2', 'test') self.assertEqual(rec.dwSerial, 125) rec = self.rpc_update_record('rpc2', 'test') self.assertEqual(rec.dwSerial, 126) rec = self.dns_update_record('dns', 'test 2') self.assertEqual(rec.dwSerial, 127) def test_add_update_dwSerial_2(self): # On Samba the RPC update resets the serial to a low number, # while Windows leaves it high. rec = self.ldap_update_record('ldap', 'test', dwSerial=123) self.assertEqual(rec.dwSerial, 123) rec = self.rpc_update_record('ldap', 'test', dwSerial=321) self.assertEqual(rec.dwSerial, 123) rec = self.dns_update_record('ldap', 'test') self.assertEqual(rec.dwSerial, 123) def test_rpc_update_disparate_types(self): """Can we use update to replace a TXT with an AAAA?""" name = 'x' old = TXTRecord("x") new = ARecord("127.0.0.111") self.rpc_replace(name, None, old) recs = self.ldap_get_records(name) self.assertEqual(len(recs), 1) self.assertEqual(recs[0].wType, old.wType) self.rpc_replace(name, old, new) recs = self.ldap_get_records(name) self.assertEqual(len(recs), 1) self.assertEqual(recs[0].wType, new.wType) def test_add_update_many(self): # Samba fails often in this set, but we want to see how it # goes further down, so we print the problems and defer the # failure. failures = 0 total = 0 def _defer_wrap(f): def _defer(*args): nonlocal failures, total total += 1 try: f(*args) except self.failureException as e: from traceback import format_stack print(f"{format_stack()[-2]} {e}\n") failures += 1 return _defer defer_assertEqual = _defer_wrap(self.assertEqual) defer_assert_timestamp_in_ballpark = \ _defer_wrap(self.assert_timestamp_in_ballpark) self.set_aging(False) rec = self.ldap_update_record('ldap', 'test', version=11, rank=22, flags=33, dwSerial=44, dwTtlSeconds=55, dwReserved=66, dwTimeStamp=77) self.assertEqual(rec.version, 5) # disobeys request self.assertEqual(rec.rank, 22) self.assertEqual(rec.flags, 33) self.assertEqual(rec.dwSerial, 44) self.assertEqual(rec.dwTtlSeconds, 55) self.assertEqual(rec.dwReserved, 66) self.assertEqual(rec.dwTimeStamp, 77) # DNS updates first rec = self.dns_update_record('ldap', 'test', ttl=999) self.assertEqual(rec.version, 5) self.assertEqual(rec.rank, 240) # rank gets fixed by DNS update defer_assertEqual(rec.flags, 0) # flags gets fixed defer_assertEqual(rec.dwSerial, 45) # serial increments self.assertEqual(rec.dwTtlSeconds, 999) # TTL set defer_assertEqual(rec.dwReserved, 0) # reserved fixed defer_assert_timestamp_in_ballpark(rec) # changed on Windows ?! self.set_aging(True) rec = self.dns_update_record('ldap', 'test', ttl=1111) self.assertEqual(rec.version, 5) self.assertEqual(rec.rank, 240) defer_assertEqual(rec.flags, 0) defer_assertEqual(rec.dwSerial, 46) self.assertEqual(rec.dwTtlSeconds, 1111) # TTL set defer_assertEqual(rec.dwReserved, 0) self.assert_timestamp_in_ballpark(rec) # RPC update rec = self.rpc_update_record('ldap', 'test', version=111, dwFlags=333, dwSerial=444, dwTtlSeconds=555, dwReserved=666, dwTimeStamp=777) self.assertEqual(rec.version, 5) # no change self.assertEqual(rec.rank, 240) # no change defer_assertEqual(rec.flags, 0) # no change defer_assertEqual(rec.dwSerial, 47) # Serial increments self.assertEqual(rec.dwTtlSeconds, 555) # TTL set defer_assertEqual(rec.dwReserved, 0) # no change self.assertEqual(rec.dwTimeStamp, 0) # timestamp zeroed # RPC update, using default values rec = self.rpc_update_record('ldap', 'test') self.assertEqual(rec.version, 5) self.assertEqual(rec.rank, 240) defer_assertEqual(rec.flags, 0) defer_assertEqual(rec.dwSerial, 48) # serial increments self.assertEqual(rec.dwTtlSeconds, 900) # TTL changed defer_assertEqual(rec.dwReserved, 0) self.assertEqual(rec.dwTimeStamp, 0) self.set_aging(False) rec = self.dns_update_record('ldap', 'test', ttl=888) self.assertEqual(rec.version, 5) self.assertEqual(rec.rank, 240) defer_assertEqual(rec.flags, 0) defer_assertEqual(rec.dwSerial, 49) # serial increments self.assertEqual(rec.dwTtlSeconds, 888) # TTL set defer_assertEqual(rec.dwReserved, 0) self.assertEqual(rec.dwTimeStamp, 0) # timestamp stays zero if failures: self.fail(f"failed {failures}/{total} defered assertions") def test_static_record_dynamic_update(self): """Add a static record, then a dynamic record. The dynamic record should have a timestamp set.""" name = 'test' txt = ['static txt'] txt2 = ['dynamic txt'] self.set_aging(True) rec = self.ldap_update_record(name, txt, dwTimeStamp=0) rec2 = self.dns_update_record(name, txt2) self.assert_timestamp_in_ballpark(rec2) ts2 = rec2.dwTimeStamp # update the first record. It should stay static (timestamp 0) rec = self.dns_update_record(name, txt) self.assertEqual(rec.dwTimeStamp, 0) # and rec2 should be unchanged. self.assertEqual(rec2.dwTimeStamp, ts2) def test_dynamic_record_static_update(self): name = 'agingtest' txt1 = ['dns update before'] txt2 = ['ldap update'] txt3 = ['dns update after'] self.set_aging(True) self.dns_update_record(name, txt1) self.ldap_update_record(name, txt2) self.dns_update_record(name, txt3) recs = self.get_rpc_records(name) for r in recs: d = [x.str for x in r.data.str] if d == txt1: self.assertNotEqual(r.dwTimeStamp, 0) elif d == txt2: self.assertEqual(r.dwTimeStamp, 0) elif d == txt3: self.assertNotEqual(r.dwTimeStamp, 0) def test_tombstone_in_hours_and_nttime(self): # Until now Samba has measured tombstone timestamps in hours, # not ten-millionths of a second. After now, we want Samba to # handle both. nh, oh, nn, on, on0, onf, nn0, nnf, _1601 = 'abcdefgij' now_hours = dsdb_dns.unix_to_dns_timestamp(int(time.time())) old_hours = now_hours - 24 * 90 now_nttime = dsdb_dns.dns_timestamp_to_nt_time(now_hours) old_nttime = dsdb_dns.dns_timestamp_to_nt_time(old_hours) # calculations on hours might be based on the lower 32 bits, # so we test with these forced to extremes (the maximum change # is 429 seconds in NTTIME). old_nttime0 = old_nttime & 0xffffffff00000000 old_nttimef = old_nttime | 0xffffffff now_nttime0 = now_nttime & 0xffffffff00000000 now_nttimef = now_nttime | 0xffffffff self.dns_tombstone(nh, epoch_nttime=now_hours) self.dns_tombstone(oh, epoch_nttime=old_hours) self.dns_tombstone(nn, epoch_nttime=now_nttime) self.dns_tombstone(on, epoch_nttime=old_nttime) self.dns_tombstone(nn0, epoch_nttime=now_nttime0) self.dns_tombstone(nnf, epoch_nttime=now_nttimef) self.dns_tombstone(on0, epoch_nttime=old_nttime0) self.dns_tombstone(onf, epoch_nttime=old_nttimef) # this is our (arbitrary) threshold that will make us think in # NTTIME, not hours. self.dns_tombstone(_1601, epoch_nttime=(10 * 1000 * 1000 + 1)) try: file_samdb = get_file_samdb() except ldb.LdbError as e: raise AssertionError( f"failing because '{e}': this is Windows?") from None dsdb._dns_delete_tombstones(file_samdb) # nh and nn should not be deleted for name in nh, nn, nn0, nnf: recs = self.ldap_get_records(name) self.assertEqual(len(recs), 1) self.assert_tombstoned(name, timestamp=False) # oh and on should be GONE for name in oh, on, on0, onf, _1601: recs = self.ldap_get_records(name) self.assertEqual(len(recs), 0) def test_dns_query_for_tombstoned_results(self): # This one fails on Windows, because the dns cache holds B # after it has been tombstoned behind its back. A = 'a' B = 'b' self.dns_tombstone(A) self.assert_tombstoned(A) r = self.dns_query(A, qtype=dns.DNS_QTYPE_TXT) self.assertEqual(r.ancount, 0) self.dns_update_record(B, B) self.dns_tombstone(B) self.assert_tombstoned(B) r = self.dns_query(B, qtype=dns.DNS_QTYPE_TXT) self.assertEqual(r.ancount, 0) def test_basic_scavenging(self): # NOTE: This one fails on Windows, because the RPC call to # prompt scavenging is not immediate. On Samba, in the # testenv, we don't have the RPC call but we can connect to # the database directly. # just to be sure we have the right limits. self.set_zone_int_params(NoRefreshInterval=168, RefreshInterval=168, Aging=1) ts1, ts2, ts3, ts4, ts5, ts6 = ('1', '2', '3', '4', '5', '6') self.dns_update_record(ts1, ts1) self.dns_update_record(ts2, ts2) # ts2 is tombstoned and timestamped in 1981 self.dns_tombstone(ts2) # ts3 is tombstoned and timestamped in the future self.dns_tombstone(ts3, epoch_hours=(DNS_TIMESTAMP_2101 - 1)) # ts4 is tombstoned and timestamped in the past self.dns_tombstone(ts4, epoch_hours=1111111) # ts5 is tombstoned in the past and timestamped in the future self.dns_tombstone(ts5, epoch_hours=5555555, epoch_nttime=int(1e10)) # ts2 and ts3 should now be tombstoned. self.assert_tombstoned(ts2) self.assert_tombstoned(ts3) # let's un-tombstone ts2 # ending up with dnsTombstoned: FALSE in Samba # and no dNSTombstoned in Windows. self.dns_update_record(ts2, "ts2 untombstoned") ts2_node = self.get_one_node(ts2) ts2_tombstone = ts2_node.get("dNSTombstoned") if ts2_tombstone is not None: self.assertEqual(ts2_tombstone[0], b"FALSE") self.assert_tombstoned(ts2, tombstoned=False) r = self.dns_update_record(ts6, ts6) # put some records into the death zone. self.ldap_modify_timestamps(ts1, -15 * 24) self.ldap_modify_timestamps(ts2, -14 * 24 - 2) self.ldap_modify_timestamps(ts6, -14 * 24 + 2) # ts1 will be saved by this record self.dns_update_record(ts1, "another record") try: # Tell the server to clean-up records. # This is how it *should* work on Windows: self.rpc_conn.DnssrvOperation2( dnsserver.DNS_CLIENT_VERSION_LONGHORN, 0, SERVER_IP, None, 0, "StartScavenging", dnsserver.DNSSRV_TYPEID_NULL, None) # Samba won't get here (NOT_IMPLEMENTED error) # wait for Windows to do its cleanup. time.sleep(2) except WERRORError as e: if e.args[0] == werror.WERR_CALL_NOT_IMPLEMENTED: # This is the Samba way, talking to the file directly, # as if we were the server process. The direct # connection is needed because the tombstoning search # involves a magic system only filter. file_samdb = get_file_samdb() dsdb._scavenge_dns_records(file_samdb) dsdb._dns_delete_tombstones(file_samdb) else: raise # Now what we should have: # ts1: alive: the old record is deleted, the new one not. # ts2: tombstoned # ts3: tombstoned # ts4: deleted. gone. # ts5: deleted. timestamp affects tombstoning, but not deletion. # ts6: alive # # We order our assertions to make the windows test # fail as late as possible (on ts4, ts5, ts2). r = self.get_unique_txt_record(ts1, ["another record"]) self.assertIsNotNone(r) r = self.get_unique_txt_record(ts6, [ts6]) self.assertIsNotNone(r) self.assert_tombstoned(ts3) n = self.get_one_node(ts4) self.assertIsNone(n) n = self.get_one_node(ts5) self.assertIsNone(n) self.assert_tombstoned(ts2) def test_samba_scavenging(self): # We expect this one to fail on Windows, because scavenging # and tombstoning cannot be performed on demand. try: file_samdb = get_file_samdb() except ldb.LdbError as e: raise AssertionError( f"failing because '{e}': this is Windows?") from None # let's try different limits. self.set_zone_int_params(NoRefreshInterval=30, RefreshInterval=20, Aging=1) now = dsdb_dns.unix_to_dns_timestamp(int(time.time())) A, B, C, D = 'ABCD' # A has current time # B has safe, non-updateable time # C has safe time # D is scavengeable atime = self.dns_update_record(A, A).dwTimeStamp btime = self.ldap_update_record(B, B, dwTimeStamp=now-20).dwTimeStamp ctime = self.ldap_update_record(C, C, dwTimeStamp=now-40).dwTimeStamp dtime = self.ldap_update_record(D, D, dwTimeStamp=now-60).dwTimeStamp self.assert_soon_after(atime, now) self.assert_timestamps_equal(btime, now-20) self.assert_timestamps_equal(ctime, now-40) self.assert_timestamps_equal(dtime, now-60) dsdb._scavenge_dns_records(file_samdb) # D should be gone (tombstoned) r = self.get_unique_txt_record(D, D) self.assertIsNone(r) r = self.dns_query(D, qtype=dns.DNS_QTYPE_TXT) self.assertEqual(r.ancount, 0) recs = self.ldap_get_records(D) self.assertEqual(len(recs), 1) self.assert_tombstoned(recs[0]) # others unchanged. atime = self.get_unique_txt_record(A, A).dwTimeStamp btime = self.get_unique_txt_record(B, B).dwTimeStamp ctime = self.get_unique_txt_record(C, C).dwTimeStamp self.assert_soon_after(atime, now) self.assert_timestamps_equal(btime, now-20) self.assert_timestamps_equal(ctime, now-40) btime = self.dns_update_record(B, B).dwTimeStamp ctime = self.dns_update_record(C, C).dwTimeStamp self.assert_timestamps_equal(btime, now-40) self.assert_soon_after(ctime, now) # after this, D *should* still be a tombstone, because its # tombstone timestamp is not very old. dsdb._dns_delete_tombstones(file_samdb) recs = self.ldap_get_records(D) self.assertEqual(len(recs), 1) self.assert_tombstoned(recs[0]) # Let's delete C using rpc, and ensure it survives dns_delete_tombstones self.rpc_delete_txt(C, C) recs = self.ldap_get_records(C) self.assertEqual(len(recs), 1) self.assert_tombstoned(recs[0]) dsdb._dns_delete_tombstones(file_samdb) recs = self.ldap_get_records(C) self.assertEqual(len(recs), 1) self.assert_tombstoned(recs[0]) # now let's wind A and B back to either side of the two week # threshold. A should survive, B should not. self.dns_tombstone(A, (now - 166)) self.dns_tombstone(B, (now - 170)) dsdb._dns_delete_tombstones(file_samdb) recs = self.ldap_get_records(A) self.assertEqual(len(recs), 1) self.assert_tombstoned(recs[0]) recs = self.ldap_get_records(B) self.assertEqual(len(recs), 0) def _test_A_and_AAAA_records(self, A, B, a_days, b_days, aging): self.set_aging(aging) name = 'aargh' now = dsdb_dns.unix_to_dns_timestamp(int(time.time())) a_initial = now - 24 * a_days b_initial = now - 24 * b_days self.dns_update_non_text(name, A) self.ldap_modify_timestamps(name, a_days * -24) rec_a = self.get_unique_ip_record(name, A) rec_b = self.add_ip_record(name, B, dwTimeStamp=b_initial) self.assert_timestamps_equal(rec_a, a_initial) self.assert_timestamps_equal(rec_b, b_initial) # touch the A record. self.dns_update_non_text(name, A) # check the A timestamp, depending on norefresh rec_a = self.get_unique_ip_record(name, A) if aging and a_days > 7: time_a = now self.assert_soon_after(rec_a, now) elif a_days > 7: # when we have NO aging and are in the refresh window, the # timestamp now reads as a_initial, but will become now # after we manipulate B for a bit. time_a = now self.assert_timestamps_equal(rec_a, a_initial) else: time_a = a_initial self.assert_timestamps_equal(rec_a, a_initial) # B timestamp should be unchanged? rec_b = self.get_unique_ip_record(name, B) self.assert_timestamps_equal(rec_b, b_initial) # touch the B record. self.dns_update_non_text(name, B) # check the B timestamp rec_b = self.get_unique_ip_record(name, B) if not aging: self.windows_variation( self.assert_soon_after, rec_b, now, msg="windows updates non-aging, samba does not") else: self.assert_soon_after(rec_b, now) # rewind B rec_b = self.add_ip_record(name, B, dwTimeStamp=b_initial) # NOW rec A might have changed! with no aging, and out of refresh. rec_a = self.get_unique_ip_record(name, A) self.assert_timestamps_equal(rec_a, time_a) self.dns_update_non_text(name, A) rec_a = self.get_unique_ip_record(name, B) self.assert_timestamps_equal(rec_b, b_initial) # now delete A _, wtype = guess_wtype(A) self.ldap_delete_record(name, A, wtype=wtype) # re-add it self.dns_update_non_text(name, A) rec_a = self.get_unique_ip_record(name, A) self.assert_soon_after(rec_a, now) rec_b = self.get_unique_ip_record(name, B) self.assert_timestamps_equal(rec_b, b_initial) def test_A_5_days_AAAA_5_days_aging(self): self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 5, 5, aging=True) def test_A_5_days_AAAA_5_days_no_aging(self): self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 5, 5, aging=False) def test_A_5_days_AAAA_10_days_aging(self): self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 5, 10, aging=True) def test_A_5_days_AAAA_10_days_no_aging(self): self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 5, 10, aging=False) def test_A_10_days_AAAA_5_days_aging(self): self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 10, 5, aging=True) def test_A_10_days_AAAA_5_days_no_aging(self): self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 10, 5, aging=False) def test_A_10_days_AAAA_9_days_aging(self): self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 10, 9, aging=True) def test_A_9_days_AAAA_10_days_no_aging(self): self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 9, 10, aging=False) def test_A_20_days_AAAA_2_days_aging(self): self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 20, 2, aging=True) def test_A_6_days_AAAA_40_days_no_aging(self): self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 6, 40, aging=False) def test_A_5_days_A_5_days_aging(self): self._test_A_and_AAAA_records(IPv4_ADDR, IPv4_ADDR_2, 5, 5, aging=True) def test_A_5_days_A_10_days_no_aging(self): self._test_A_and_AAAA_records(IPv4_ADDR, IPv4_ADDR_2, 5, 10, aging=False) def test_AAAA_5_days_AAAA_6_days_aging(self): self._test_A_and_AAAA_records(IPv6_ADDR, IPv6_ADDR_2, 5, 6, aging=True) def test_AAAA_5_days_AAAA_6_days_no_aging(self): self._test_A_and_AAAA_records(IPv6_ADDR, IPv6_ADDR_2, 5, 6, aging=False) def _test_multi_records_delete(self, aging): # Batch deleting a type doesn't update other types timestamps. self.set_aging(aging) name = 'aargh' now = dsdb_dns.unix_to_dns_timestamp(int(time.time())) back_5_days = now - 5 * 24 back_10_days = now - 10 * 24 back_25_days = now - 25 * 24 ip4s = { '1.1.1.1': now, '2.2.2.2': back_5_days, '3.3.3.3': back_10_days, } ip6s = { '::1': now, '::2': back_5_days, '::3': back_25_days, } txts = { '1': now, '2': back_5_days, '3': back_25_days, } # For windows, if we don't DNS update something, it won't know # there's anything. self.dns_update_record(name, '3') for k, v in ip4s.items(): r = self.add_ip_record(name, k, wtype=dns.DNS_QTYPE_A, dwTimeStamp=v) for k, v in ip6s.items(): r = self.add_ip_record(name, k, wtype=dns.DNS_QTYPE_AAAA, dwTimeStamp=v) for k, v in txts.items(): r = self.ldap_update_record(name, k, dwTimeStamp=v) self.dns_delete_type(name, dnsp.DNS_TYPE_A) r = self.dns_query(name, dns.DNS_QTYPE_A) self.assertEqual(r.ancount, 0) r = self.dns_query(name, dns.DNS_QTYPE_TXT) self.assertEqual(r.ancount, 3) rset = set(x.rdata.txt.str[0] for x in r.answers) self.assertEqual(rset, set(txts)) r = self.dns_query(name, dns.DNS_QTYPE_AAAA) self.assertEqual(r.ancount, 3) rset = set(ipv6_normalise(x.rdata) for x in r.answers) self.assertEqual(rset, set(ip6s)) recs = self.ldap_get_records(name) self.assertEqual(len(recs), 6) for r in recs: if r.wType == dns.DNS_QTYPE_AAAA: k = ipv6_normalise(r.data) expected = ip6s[k] elif r.wType == dns.DNS_QTYPE_TXT: k = r.data.str[0] expected = txts[k] else: self.fail(f"unexpected wType {r.wType}") self.assert_timestamps_equal(r.dwTimeStamp, expected) def test_multi_records_delete_aging(self): self._test_multi_records_delete(True) def test_multi_records_delete_no_aging(self): self._test_multi_records_delete(False) def _test_dns_delete_times(self, n_days, aging=True): # In these tests, Windows replaces the records with # tombstones, while Samba just removes them. Both are # reasonable approaches (there is no reanimation pathway for # tombstones), but this means self.ldap_get_records() gets # different numbers for each. So we use # self.ldap_get_non_tombstoned_record(). name = 'test' A = ['A'] B = ['B'] C = ['C'] D = ['D'] self.set_aging(aging) now = dsdb_dns.unix_to_dns_timestamp(int(time.time())) n_days_ago = max(now - n_days * 24, 0) self.dns_update_record(name, A) self.ldap_update_record(name, A, dwTimeStamp=n_days_ago) self.ldap_update_record(name, B, dwTimeStamp=n_days_ago) self.ldap_update_record(name, C, dwTimeStamp=n_days_ago) self.dns_update_record(name, D) r = self.dns_query(name, dns.DNS_QTYPE_TXT) rset = set(x.rdata.txt.str[0] for x in r.answers) self.assertEqual(rset, set('ABCD')) atime = self.get_unique_txt_record(name, A).dwTimeStamp btime = self.get_unique_txt_record(name, B).dwTimeStamp ctime = self.get_unique_txt_record(name, C).dwTimeStamp dtime = self.get_unique_txt_record(name, D).dwTimeStamp recs = self.ldap_get_records(name) self.assertEqual(len(recs), 4) r = self.dns_query(name, dns.DNS_QTYPE_TXT) rset = set(x.rdata.txt.str[0] for x in r.answers) self.assertEqual(rset, set('ABCD')) self.dns_delete(name, D) self.assert_timestamps_equal(atime, self.get_unique_txt_record(name, A)) self.assert_timestamps_equal(btime, self.get_unique_txt_record(name, B)) self.assert_timestamps_equal(ctime, self.get_unique_txt_record(name, C)) recs = self.ldap_get_non_tombstoned_records(name) self.assertEqual(len(recs), 3) r = self.dns_query(name, dns.DNS_QTYPE_TXT) rset = set(x.rdata.txt.str[0] for x in r.answers) self.assertEqual(rset, set('ABC')) self.rpc_delete_txt(name, C) self.assert_timestamps_equal(atime, self.get_unique_txt_record(name, A)) self.assert_timestamps_equal(btime, self.get_unique_txt_record(name, B)) recs = self.ldap_get_non_tombstoned_records(name) self.assertEqual(len(recs), 2) r = self.dns_query(name, dns.DNS_QTYPE_TXT) rset = set(x.rdata.txt.str[0] for x in r.answers) self.assertEqual(rset, set('AB')) self.dns_delete(name, A) self.assert_timestamps_equal(btime, self.get_unique_txt_record(name, B)) recs = self.ldap_get_records(name) self.assertEqual(len(recs), 1) r = self.dns_query(name, dns.DNS_QTYPE_TXT) rset = set(x.rdata.txt.str[0] for x in r.answers) self.assertEqual(rset, {'B'}) self.dns_delete(name, B) recs = self.ldap_get_non_tombstoned_records(name) # Windows leaves the node with zero records. Samba ends up # with a tombstone. self.assertEqual(len(recs), 0) r = self.dns_query(name, dns.DNS_QTYPE_TXT) rset = set(x.rdata.txt.str[0] for x in r.answers) self.assertEqual(len(rset), 0) def test_dns_delete_times_5_days_aging(self): self._test_dns_delete_times(5, True) def test_dns_delete_times_11_days_aging(self): self._test_dns_delete_times(11, True) def test_dns_delete_times_366_days_aging(self): self._test_dns_delete_times(366, True) def test_dns_delete_times_static_aging(self): self._test_dns_delete_times(1e10, True) def test_dns_delete_times_5_days_no_aging(self): self._test_dns_delete_times(5, False) def test_dns_delete_times_11_days_no_aging(self): self._test_dns_delete_times(11, False) def test_dns_delete_times_366_days_no_aging(self): self._test_dns_delete_times(366, False) def test_dns_delete_times_static_no_aging(self): self._test_dns_delete_times(1e10, False) def _test_dns_delete_simple(self, a_days, b_days, aging=True, touch=False): # Here we show that with aging enabled, the timestamp of # sibling records is *not* modified when a record is deleted. # # With aging disabled, it *is* modified, if the dns server has # seen it updated before ldap set the time (that is, probably # the dns server overwrites AD). This happens even if AD # thinks the record is static. name = 'test' A = ['A'] B = ['B'] self.set_aging(aging) now = dsdb_dns.unix_to_dns_timestamp(int(time.time())) a_days_ago = max(now - a_days * 24, 0) b_days_ago = max(now - b_days * 24, 0) if touch: self.dns_update_record(name, A) self.dns_update_record(name, B) self.ldap_update_record(name, A, dwTimeStamp=a_days_ago) self.ldap_update_record(name, B, dwTimeStamp=b_days_ago) atime = self.get_unique_txt_record(name, A).dwTimeStamp self.dns_delete(name, B) if not aging and touch: # this resets the timestamp even if it is a static record. self.assert_soon_after(self.get_unique_txt_record(name, A), now) else: self.assert_timestamps_equal(self.get_unique_txt_record(name, A), atime) def test_dns_delete_simple_2_3_days_aging(self): self._test_dns_delete_simple(2, 3, True) def test_dns_delete_simple_2_3_days_no_aging(self): self._test_dns_delete_simple(2, 3, False) def test_dns_delete_simple_2_13_days_aging(self): self._test_dns_delete_simple(2, 13, True) def test_dns_delete_simple_2_13_days_no_aging(self): self._test_dns_delete_simple(2, 13, False) def test_dns_delete_simple_12_13_days_aging(self): self._test_dns_delete_simple(12, 13, True) def test_dns_delete_simple_12_13_days_no_aging(self): self._test_dns_delete_simple(12, 13, False) def test_dns_delete_simple_112_113_days_aging(self): self._test_dns_delete_simple(112, 113, True) def test_dns_delete_simple_112_113_days_no_aging(self): self._test_dns_delete_simple(112, 113, False) def test_dns_delete_simple_0_113_days_aging(self): # 1e9 hours ago evaluates to 0, i.e static self._test_dns_delete_simple(1e9, 113, True) def test_dns_delete_simple_0_113_days_no_aging(self): self._test_dns_delete_simple(1e9, 113, False) def test_dns_delete_simple_0_0_days_aging(self): self._test_dns_delete_simple(1e9, 1e9, True) def test_dns_delete_simple_0_0_days_no_aging(self): self._test_dns_delete_simple(1e9, 1e9, False) def test_dns_delete_simple_10_0_days_aging(self): self._test_dns_delete_simple(10, 1e9, True) def test_dns_delete_simple_10_0_days_no_aging(self): self._test_dns_delete_simple(10, 1e9, False) def test_dns_delete_simple_2_3_days_aging_touch(self): self._test_dns_delete_simple(2, 3, True, True) def test_dns_delete_simple_2_3_days_no_aging_touch(self): self._test_dns_delete_simple(2, 3, False, True) def test_dns_delete_simple_2_13_days_aging_touch(self): self._test_dns_delete_simple(2, 13, True, True) def test_dns_delete_simple_2_13_days_no_aging_touch(self): self._test_dns_delete_simple(2, 13, False, True) def test_dns_delete_simple_12_13_days_aging_touch(self): self._test_dns_delete_simple(12, 13, True, True) def test_dns_delete_simple_12_13_days_no_aging_touch(self): self._test_dns_delete_simple(12, 13, False, True) def test_dns_delete_simple_112_113_days_aging_touch(self): self._test_dns_delete_simple(112, 113, True, True) def test_dns_delete_simple_112_113_days_no_aging_touch(self): self._test_dns_delete_simple(112, 113, False, True) def test_dns_delete_simple_0_113_days_aging_touch(self): # 1e9 hours ago evaluates to 0, i.e static self._test_dns_delete_simple(1e9, 113, True, True) def test_dns_delete_simple_0_113_days_no_aging_touch(self): self._test_dns_delete_simple(1e9, 113, False, True) def test_dns_delete_simple_0_0_days_aging_touch(self): self._test_dns_delete_simple(1e9, 1e9, True, True) def test_dns_delete_simple_0_0_days_no_aging_touch(self): self._test_dns_delete_simple(1e9, 1e9, False, True) def test_dns_delete_simple_10_0_days_aging_touch(self): self._test_dns_delete_simple(10, 1e9, True, True) def test_dns_delete_simple_10_0_days_no_aging_touch(self): self._test_dns_delete_simple(10, 1e9, False, True) def windows_variation(self, fn, *args, msg=None, **kwargs): try: fn(*args, **kwargs) except AssertionError as e: print("Expected success on Windows only, failed as expected:\n" + c_GREEN(e)) return print(c_RED("known Windows failure")) if msg is not None: print(c_DARK_YELLOW(msg)) print("Expected success on Windows:\n" + c_GREEN(f"{fn.__name__} {args} {kwargs}")) def _test_dns_add_sibling(self, a_days, refresh, aging=True, touch=False): # Here we show that with aging enabled, the timestamp of # sibling records *is* modified when a record is added. # # With aging disabled, it *is* modified, if the dns server has # seen it updated before ldap set the time (that is, probably # the dns server overwrites AD). This happens even if AD # thinks the record is static. name = 'test' A = ['A'] B = ['B'] self.set_zone_int_params(RefreshInterval=int(refresh), NoRefreshInterval=7, Aging=int(aging)) now = dsdb_dns.unix_to_dns_timestamp(int(time.time())) a_days_ago = max(now - a_days * 24, 0) if touch: self.dns_update_record(name, A) self.ldap_update_record(name, A, dwTimeStamp=a_days_ago) atime = self.get_unique_txt_record(name, A).dwTimeStamp self.dns_update_record(name, B) a_rec = self.get_unique_txt_record(name, A) if not aging and touch: # On Windows, this resets the timestamp even if it is a # static record, though in that case it may be a # transitory effect of the DNS cache. We will insist on # the Samba behaviour of not changing (that is # un-static-ing) a zero timestamp, because that is the # sensible thing. if a_days_ago == 0: self.windows_variation( self.assert_soon_after, a_rec, now, msg="Windows resets static siblings (cache effect?)") self.assert_timestamps_equal(a_rec, 0) else: self.assert_soon_after(a_rec, now) else: self.assert_timestamps_equal(a_rec, atime) b_rec = self.get_unique_txt_record(name, B) self.assert_soon_after(b_rec, now) def test_dns_add_sibling_2_7_days_aging(self): self._test_dns_add_sibling(2, 7, True) def test_dns_add_sibling_2_7_days_no_aging(self): self._test_dns_add_sibling(2, 7, False) def test_dns_add_sibling_12_7_days_aging(self): self._test_dns_add_sibling(12, 7, True) def test_dns_add_sibling_12_7_days_no_aging(self): self._test_dns_add_sibling(12, 7, False) def test_dns_add_sibling_12_3_days_aging(self): self._test_dns_add_sibling(12, 3, True) def test_dns_add_sibling_12_3_days_no_aging(self): self._test_dns_add_sibling(12, 3, False) def test_dns_add_sibling_112_7_days_aging(self): self._test_dns_add_sibling(112, 7, True) def test_dns_add_sibling_112_7_days_no_aging(self): self._test_dns_add_sibling(112, 7, False) def test_dns_add_sibling_12_113_days_aging(self): self._test_dns_add_sibling(12, 113, True) def test_dns_add_sibling_12_113_days_no_aging(self): self._test_dns_add_sibling(12, 113, False) def test_dns_add_sibling_0_7_days_aging(self): # 1e9 days ago evaluates to 0, i.e static self._test_dns_add_sibling(1e9, 7, True) def test_dns_add_sibling_0_7_days_no_aging(self): self._test_dns_add_sibling(1e9, 7, False) def test_dns_add_sibling_0_0_days_aging(self): self._test_dns_add_sibling(1e9, 0, True) def test_dns_add_sibling_0_0_days_no_aging(self): self._test_dns_add_sibling(1e9, 0, False) def test_dns_add_sibling_10_0_days_aging(self): self._test_dns_add_sibling(10, 0, True) def test_dns_add_sibling_10_0_days_no_aging(self): self._test_dns_add_sibling(10, 0, False) def test_dns_add_sibling_2_7_days_aging_touch(self): self._test_dns_add_sibling(2, 7, True, True) def test_dns_add_sibling_2_7_days_no_aging_touch(self): self._test_dns_add_sibling(2, 7, False, True) def test_dns_add_sibling_12_7_days_aging_touch(self): self._test_dns_add_sibling(12, 7, True, True) def test_dns_add_sibling_12_7_days_no_aging_touch(self): self._test_dns_add_sibling(12, 7, False, True) def test_dns_add_sibling_12_3_days_aging_touch(self): self._test_dns_add_sibling(12, 3, True, True) def test_dns_add_sibling_12_3_days_no_aging_touch(self): self._test_dns_add_sibling(12, 3, False, True) def test_dns_add_sibling_112_7_days_aging_touch(self): self._test_dns_add_sibling(112, 7, True, True) def test_dns_add_sibling_112_7_days_no_aging_touch(self): self._test_dns_add_sibling(112, 7, False, True) def test_dns_add_sibling_12_113_days_aging_touch(self): self._test_dns_add_sibling(12, 113, True, True) def test_dns_add_sibling_12_113_days_no_aging_touch(self): self._test_dns_add_sibling(12, 113, False, True) def test_dns_add_sibling_0_7_days_aging_touch(self): self._test_dns_add_sibling(1e9, 7, True, True) def test_dns_add_sibling_0_7_days_no_aging_touch(self): self._test_dns_add_sibling(1e9, 7, False, True) def test_dns_add_sibling_0_0_days_aging_touch(self): self._test_dns_add_sibling(1e9, 0, True, True) def test_dns_add_sibling_0_0_days_no_aging_touch(self): self._test_dns_add_sibling(1e9, 0, False, True) def test_dns_add_sibling_10_0_days_aging_touch(self): self._test_dns_add_sibling(10, 0, True, True) def test_dns_add_sibling_10_0_days_no_aging_touch(self): self._test_dns_add_sibling(10, 0, False, True) TestProgram(module=__name__, opts=subunitopts)