2021-04-28 17:40:08 +12:00
# Unix SMB/CIFS implementation.
# Copyright (C) Kai Blin <kai@samba.org> 2011
# Copyright (C) Catalyst.NET 2021
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import sys
from samba import dsdb
from samba import dsdb_dns
from samba . ndr import ndr_unpack , ndr_pack
from samba . samdb import SamDB
from samba . auth import system_session
import ldb
from samba import credentials
from samba . dcerpc import dns , dnsp , dnsserver
2021-06-17 23:10:50 +12:00
from samba . dnsserver import TXTRecord , ARecord
2023-08-25 11:09:52 +12:00
from samba . dnsserver import ipv6_normalise
2021-04-28 17:40:08 +12:00
from samba . tests . subunitrun import SubunitOptions , TestProgram
from samba import werror , WERRORError
from samba . tests . dns_base import DNSTest
import samba . getopt as options
import optparse
import time
2021-06-19 15:43:29 +12:00
from samba . colour import c_RED , c_GREEN , c_DARK_YELLOW
2021-04-28 17:40:08 +12:00
parser = optparse . OptionParser (
" dns_aging.py <server name> <server ip> [options] " )
sambaopts = options . SambaOptions ( parser )
parser . add_option_group ( sambaopts )
# use command line creds if available
credopts = options . CredentialsOptions ( parser )
parser . add_option_group ( credopts )
subunitopts = SubunitOptions ( parser )
parser . add_option_group ( subunitopts )
opts , args = parser . parse_args ( )
if len ( args ) < 2 :
parser . print_usage ( )
sys . exit ( 1 )
LP = sambaopts . get_loadparm ( )
CREDS = credopts . get_credentials ( LP )
SERVER_NAME = args [ 0 ]
SERVER_IP = args [ 1 ]
CREDS . set_krb_forwardable ( credentials . NO_KRB_FORWARDABLE )
DOMAIN = CREDS . get_realm ( ) . lower ( )
# Unix time start, in DNS timestamp (24 * 365.25 * 369)
# These are ballpark extremes for the timestamp.
DNS_TIMESTAMP_1970 = 3234654
DNS_TIMESTAMP_2101 = 4383000
DNS_TIMESTAMP_1981 = 3333333 # a middling timestamp
2021-06-10 23:30:19 +00:00
IPv4_ADDR = " 127.0.0.33 "
IPv6_ADDR = " ::1 "
IPv4_ADDR_2 = " 127.0.0.66 "
IPv6_ADDR_2 = " 1::1 "
2021-04-28 17:40:08 +12:00
def get_samdb ( ) :
return SamDB ( url = f " ldap:// { SERVER_IP } " ,
lp = LP ,
session_info = system_session ( ) ,
credentials = CREDS )
def get_file_samdb ( ) :
# For Samba only direct file access, needed for the tombstoning functions.
# (For Windows, we instruct it to tombstone over RPC).
return SamDB ( url = LP . samdb_url ( ) ,
lp = LP ,
session_info = system_session ( ) ,
credentials = CREDS )
def get_rpc ( ) :
return dnsserver . dnsserver ( f " ncacn_ip_tcp: { SERVER_IP } [sign] " , LP , CREDS )
def create_zone ( name , rpc = None , aging = True ) :
if rpc is None :
rpc = get_rpc ( )
z = dnsserver . DNS_RPC_ZONE_CREATE_INFO_LONGHORN ( )
z . pszZoneName = name
z . dwZoneType = dnsp . DNS_ZONE_TYPE_PRIMARY
z . fAging = int ( bool ( aging ) )
z . dwDpFlags = dnsserver . DNS_DP_DOMAIN_DEFAULT
z . fDsIntegrated = 1
z . fLoadExisting = 1
z . fAllowUpdate = dnsp . DNS_ZONE_UPDATE_UNSECURE
rpc . DnssrvOperation2 ( dnsserver . DNS_CLIENT_VERSION_LONGHORN ,
0 ,
SERVER_IP ,
None ,
0 ,
' ZoneCreate ' ,
dnsserver . DNSSRV_TYPEID_ZONE_CREATE ,
z )
def delete_zone ( name , rpc = None ) :
if rpc is None :
rpc = get_rpc ( )
rpc . DnssrvOperation2 ( dnsserver . DNS_CLIENT_VERSION_LONGHORN ,
0 ,
SERVER_IP ,
name ,
0 ,
' DeleteZoneFromDs ' ,
dnsserver . DNSSRV_TYPEID_NULL ,
None )
def txt_s_list ( txt ) :
""" Construct a txt record string list, which is a fiddly matter. """
if isinstance ( txt , str ) :
txt = [ txt ]
s_list = dnsp . string_list ( )
s_list . count = len ( txt )
s_list . str = txt
return s_list
2021-06-10 23:29:15 +00:00
def make_txt_record ( txt ) :
r = dns . txt_record ( )
r . txt = txt_s_list ( txt )
return r
2021-04-28 17:40:08 +12:00
def copy_rec ( rec ) :
copy = dnsserver . DNS_RPC_RECORD ( )
copy . wType = rec . wType
copy . dwFlags = rec . dwFlags
copy . dwSerial = rec . dwSerial
copy . dwTtlSeconds = rec . dwTtlSeconds
copy . data = rec . data
copy . dwTimeStamp = rec . dwTimeStamp
return copy
2021-06-10 23:29:15 +00:00
def guess_wtype ( data ) :
if isinstance ( data , list ) :
data = make_txt_record ( data )
return ( data , dnsp . DNS_TYPE_TXT )
if " : " in data :
return ( data , dnsp . DNS_TYPE_AAAA )
return ( data , dnsp . DNS_TYPE_A )
2021-04-28 17:40:08 +12:00
class TestDNSAging ( DNSTest ) :
""" Probe DNS aging and scavenging, using LDAP and RPC to set and test
the timestamps behind DNS ' s back. " " "
server = SERVER_NAME
server_ip = SERVER_IP
creds = CREDS
def setUp ( self ) :
super ( ) . setUp ( )
self . rpc_conn = get_rpc ( )
self . samdb = get_samdb ( )
# We always have a zone of our own named after the test function.
self . zone = self . id ( ) . rsplit ( ' . ' , 1 ) [ 1 ]
self . addCleanup ( delete_zone , self . zone , self . rpc_conn )
try :
create_zone ( self . zone , self . rpc_conn )
except WERRORError as e :
if e . args [ 0 ] != werror . WERR_DNS_ERROR_ZONE_ALREADY_EXISTS :
raise
print ( f " zone { self . zone } already exists " )
# Though we set this in create_zone(), that doesn't work on
# Windows, so we repeat again here.
self . set_zone_int_params ( AllowUpdate = dnsp . DNS_ZONE_UPDATE_UNSECURE )
self . zone_dn = ( f " DC= { self . zone } ,CN=MicrosoftDNS,DC=DomainDNSZones, "
f " { self . samdb . get_default_basedn ( ) } " )
def set_zone_int_params ( self , zone = None , * * kwargs ) :
""" Keyword arguments set parameters on the zone. e.g.:
self . set_zone_int_params ( Aging = 1 ,
RefreshInterval = 222 )
See [ MS - DNSP ] 3.1 .1 .2 .1 " DNS Zone Integer Properties " for names .
"""
if zone is None :
zone = self . zone
for key , val in kwargs . items ( ) :
name_param = dnsserver . DNS_RPC_NAME_AND_PARAM ( )
name_param . dwParam = val
name_param . pszNodeName = key
try :
self . rpc_conn . DnssrvOperation2 (
dnsserver . DNS_CLIENT_VERSION_LONGHORN ,
0 ,
SERVER_IP ,
zone ,
0 ,
' ResetDwordProperty ' ,
dnsserver . DNSSRV_TYPEID_NAME_AND_PARAM ,
name_param )
except WERRORError as e :
self . fail ( str ( e ) )
def rpc_replace ( self , name , old = None , new = None ) :
""" Replace a DNS_RPC_RECORD or DNS_RPC_RECORD_BUF """
# wrap our recs, if necessary
if isinstance ( new , dnsserver . DNS_RPC_RECORD ) :
rec = new
new = dnsserver . DNS_RPC_RECORD_BUF ( )
new . rec = rec
if isinstance ( old , dnsserver . DNS_RPC_RECORD ) :
rec = old
old = dnsserver . DNS_RPC_RECORD_BUF ( )
old . rec = rec
try :
self . rpc_conn . DnssrvUpdateRecord2 (
dnsserver . DNS_CLIENT_VERSION_LONGHORN ,
0 ,
SERVER_IP ,
self . zone ,
name ,
new ,
old )
except WERRORError as e :
self . fail ( f " could not replace record ( { e } ) " )
def get_unique_txt_record ( self , name , txt ) :
""" Get the TXT record on Name with value txt, asserting that there is
only one . """
if isinstance ( txt , str ) :
txt = [ txt ]
recs = self . ldap_get_records ( name )
match = None
for r in recs :
if r . wType != dnsp . DNS_TYPE_TXT :
continue
txt2 = [ x for x in r . data . str ]
if txt2 == txt :
self . assertIsNone ( match )
match = r
return match
2021-06-10 23:29:15 +00:00
def get_unique_ip_record ( self , name , addr , wtype = None ) :
""" Get an A or AAAA record on name with the matching data. """
if wtype is None :
2021-06-10 23:30:19 +00:00
addr , wtype = guess_wtype ( addr )
2021-06-10 23:29:15 +00:00
recs = self . ldap_get_records ( name )
# We need to use the internal dns_record_match because not all
# forms always match on strings (e.g. IPv6)
rec = dnsp . DnssrvRpcRecord ( )
rec . wType = wtype
rec . data = addr
match = None
for r in recs :
if dsdb_dns . records_match ( r , rec ) :
self . assertIsNone ( match )
match = r
return match
def dns_query ( self , name , qtype = dns . DNS_QTYPE_ALL ) :
""" make a query, which might help Windows notice LDAP changes """
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
fullname = " %s . %s " % ( name , self . zone )
q = self . make_name_question ( fullname , qtype , dns . DNS_QCLASS_IN )
self . finish_name_packet ( p , [ q ] )
r , rp = self . dns_transaction_udp ( p , host = SERVER_IP )
return r
def dns_update_non_text ( self , name ,
data ,
wtype = None ,
qclass = dns . DNS_QCLASS_IN ) :
if wtype is None :
data , wtype = guess_wtype ( data )
if qclass == dns . DNS_QCLASS_IN :
ttl = 123
else :
ttl = 0
fullname = " %s . %s " % ( name , self . zone )
p = self . make_name_packet ( dns . DNS_OPCODE_UPDATE )
u = self . make_name_question ( self . zone ,
dns . DNS_QTYPE_SOA ,
dns . DNS_QCLASS_IN )
self . finish_name_packet ( p , [ u ] )
r = dns . res_rec ( )
r . name = fullname
r . rr_type = wtype
r . rr_class = qclass
r . ttl = ttl
2021-06-18 18:32:22 +12:00
if data is not None :
r . length = 0xffff
r . rdata = data
else :
r . length = 0
2021-06-10 23:29:15 +00:00
p . nscount = 1
p . nsrecs = [ r ]
( code , response ) = self . dns_transaction_udp ( p , host = SERVER_IP )
self . assert_dns_rcode_equals ( code , dns . DNS_RCODE_OK )
return response
2021-06-14 23:36:57 +12:00
def dns_delete ( self , name , data , wtype = None ) :
return self . dns_update_non_text ( name ,
data ,
wtype ,
qclass = dns . DNS_QCLASS_NONE )
2021-06-18 18:32:22 +12:00
def dns_delete_type ( self , name , wtype ) :
return self . dns_update_non_text ( name ,
None ,
wtype ,
qclass = dns . DNS_QCLASS_ANY )
2021-04-28 17:40:08 +12:00
def dns_update_record ( self , name , txt , ttl = 900 ) :
if isinstance ( txt , str ) :
txt = [ txt ]
p = self . make_txt_update ( name , txt , self . zone , ttl = ttl )
( code , response ) = self . dns_transaction_udp ( p , host = SERVER_IP )
2021-06-11 17:31:54 +12:00
if code . operation & dns . DNS_RCODE == dns . DNS_RCODE_REFUSED :
# sometimes you might forget this
print ( " \n \n got DNS_RCODE_REFUSED \n " )
print ( " Are you running this in the fl2003 environment? \n " )
print ( " try `SELFTEST_TESTENV= ' fl2003dc:local ' make testenv` \n \n " )
2021-04-28 17:40:08 +12:00
self . assert_dns_rcode_equals ( code , dns . DNS_RCODE_OK )
return self . get_unique_txt_record ( name , txt )
def rpc_update_record ( self , name , txt , * * kwargs ) :
""" Add the record that self.dns_update_record() would add, via the
dnsserver RPC pipe .
As with DNS update , if the record already exists , we replace it .
"""
if isinstance ( txt , str ) :
txt = [ txt ]
old = TXTRecord ( txt )
rec = TXTRecord ( txt )
for k , v in kwargs . items ( ) :
setattr ( rec , k , v )
try :
self . rpc_replace ( name , old , rec )
except AssertionError as e :
# we have caught and wrapped the WERRor inside
if ' WERR_DNS_ERROR_RECORD_DOES_NOT_EXIST ' not in str ( e ) :
raise
self . rpc_replace ( name , None , rec )
return self . get_unique_txt_record ( name , txt )
2021-06-14 23:30:23 +12:00
def rpc_delete_txt ( self , name , txt ) :
if isinstance ( txt , str ) :
txt = [ txt ]
old = TXTRecord ( txt )
self . rpc_replace ( name , old , None )
2021-04-28 17:40:08 +12:00
def get_one_node ( self , name ) :
2023-08-29 17:37:31 +12:00
self . assertIsInstance ( name , str )
2021-04-28 17:40:08 +12:00
expr = f " (&(objectClass=dnsNode)(name= { name } )) "
nodes = self . samdb . search ( base = self . zone_dn ,
scope = ldb . SCOPE_SUBTREE ,
expression = expr ,
attrs = [ " dnsRecord " , " dNSTombstoned " , " name " ] )
if len ( nodes ) > 1 :
self . fail (
f " expected 0 or 1 dnsNodes for { name } , found { len ( nodes ) } " )
if len ( nodes ) == 0 :
return None
return nodes [ 0 ]
def ldap_get_records ( self , name ) :
node = self . get_one_node ( name )
if node is None :
return [ ]
records = node . get ( ' dnsRecord ' )
return [ ndr_unpack ( dnsp . DnssrvRpcRecord , r ) for r in records ]
2021-06-14 23:37:37 +12:00
def ldap_get_non_tombstoned_records ( self , name ) :
all_records = self . ldap_get_records ( name )
records = [ ]
for r in all_records :
if r . wType != dnsp . DNS_TYPE_TOMBSTONE :
records . append ( r )
return records
2021-04-28 17:40:08 +12:00
def assert_tombstoned ( self , name , tombstoned = True , timestamp = None ) :
# If run with tombstoned=False, assert it isn't tombstoned
# (and has no traces of tombstone). Otherwise assert it has
# all the necessary bits.
2021-06-16 22:48:56 +00:00
#
# with timestamp=<non-zero number of hours>, we assert that
# the nttime timestamp is about that time.
#
# with timestamp=None, we assert it is within a century or so.
#
# with timestamp=False (or 0), we don't assert on it.
2021-04-28 17:40:08 +12:00
node = self . get_one_node ( name )
if node is None :
self . fail ( f " no node named { name } " )
dnsts = node . get ( " dNSTombstoned " )
if dnsts is None :
is_tombstoned = False
else :
self . assertEqual ( len ( dnsts ) , 1 )
if dnsts [ 0 ] == b ' TRUE ' :
is_tombstoned = True
else :
is_tombstoned = False
if tombstoned != is_tombstoned :
if is_tombstoned :
self . fail ( f " { name } is tombstoned " )
else :
self . fail ( f " { name } is not tombstoned " )
recs = self . ldap_get_records ( name )
if is_tombstoned :
self . assertEqual ( len ( recs ) , 1 )
self . assertEqual ( recs [ 0 ] . wType , dnsp . DNS_TYPE_TOMBSTONE )
if timestamp is None :
self . assert_nttime_in_hour_range ( recs [ 0 ] . data )
2021-06-16 22:48:56 +00:00
elif timestamp :
2021-04-28 17:40:08 +12:00
self . assert_nttime_in_hour_range ( recs [ 0 ] . data ,
timestamp - 3 ,
timestamp + 3 )
else :
for r in recs :
self . assertNotEqual ( recs [ 0 ] . wType , dnsp . DNS_TYPE_TOMBSTONE )
def ldap_replace_records ( self , name , records ) :
# We use raw ldap to avoid the "helpfulness" of dsdb_dns.replace()
dn = f ' DC= { name } , { self . zone_dn } '
msg = ldb . Message . from_dict ( self . samdb ,
{ ' dn ' : dn ,
' dnsRecord ' : [ ndr_pack ( r ) for r in records ]
} ,
ldb . FLAG_MOD_REPLACE )
try :
self . samdb . modify ( msg )
except ldb . LdbError as e :
if ' LDAP_NO_SUCH_OBJECT ' not in e . args [ 1 ] :
raise
# We need to do an add
msg [ " objectClass " ] = [ " top " , " dnsNode " ]
msg [ " dnsRecord " ] . set_flags ( ldb . FLAG_MOD_ADD )
self . samdb . add ( msg )
2021-06-10 23:29:15 +00:00
def ldap_update_core ( self , name , wtype , data , * * kwargs ) :
""" This one is not TXT specific. """
2021-04-28 17:40:08 +12:00
records = self . ldap_get_records ( name )
# default values
rec = dnsp . DnssrvRpcRecord ( )
2021-06-10 23:29:15 +00:00
rec . wType = wtype
2021-04-28 17:40:08 +12:00
rec . rank = dnsp . DNS_RANK_ZONE
rec . dwTtlSeconds = 900
rec . dwSerial = 110
rec . dwTimeStamp = 0
2021-06-10 23:29:15 +00:00
rec . data = data
2021-04-28 17:40:08 +12:00
# override defaults, as required
for k , v in kwargs . items ( ) :
setattr ( rec , k , v )
for i , r in enumerate ( records [ : ] ) :
if dsdb_dns . records_match ( r , rec ) :
records [ i ] = rec
break
else : # record not found
records . append ( rec )
self . ldap_replace_records ( name , records )
2021-06-10 23:29:15 +00:00
return rec
def ldap_update_record ( self , name , txt , * * kwargs ) :
""" Add the record that self.dns_update_record() would add, via ldap,
thus allowing us to set additional dnsRecord features like
dwTimestamp .
"""
rec = self . ldap_update_core ( name ,
dnsp . DNS_TYPE_TXT ,
txt_s_list ( txt ) ,
* * kwargs )
2021-04-28 17:40:08 +12:00
recs = self . ldap_get_records ( name )
match = None
for r in recs :
if r . wType != rec . wType :
continue
if r . data . str == rec . data . str :
self . assertIsNone ( match , f " duplicate records for { name } " )
match = r
self . assertEqual ( match . rank , rec . rank & 255 )
self . assertEqual ( match . dwTtlSeconds , rec . dwTtlSeconds )
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( match . dwTimeStamp , rec . dwTimeStamp )
2021-04-28 17:40:08 +12:00
return match
2021-06-10 23:29:15 +00:00
def ldap_delete_record ( self , name , data , wtype = dnsp . DNS_TYPE_TXT ) :
2021-04-28 17:40:08 +12:00
rec = dnsp . DnssrvRpcRecord ( )
2021-06-10 23:29:15 +00:00
if wtype == dnsp . DNS_TYPE_TXT :
data = txt_s_list ( data )
rec . wType = wtype
rec . data = data
2021-04-28 17:40:08 +12:00
records = self . ldap_get_records ( name )
for i , r in enumerate ( records [ : ] ) :
if dsdb_dns . records_match ( r , rec ) :
del records [ i ]
break
else :
2021-06-10 23:29:15 +00:00
self . fail ( f " record { data } not found " )
2021-04-28 17:40:08 +12:00
self . ldap_replace_records ( name , records )
2021-06-10 23:29:15 +00:00
def add_ip_record ( self , name , addr , wtype = None , * * kwargs ) :
if wtype is None :
addr , wtype = guess_wtype ( addr )
rec = self . ldap_update_core ( name ,
wtype ,
addr ,
* * kwargs )
recs = self . ldap_get_records ( name )
match = None
for r in recs :
if dsdb_dns . records_match ( r , rec ) :
self . assertIsNone ( match , f " duplicate records for { name } " )
match = r
self . assertEqual ( match . rank , rec . rank & 255 )
self . assertEqual ( match . dwTtlSeconds , rec . dwTtlSeconds )
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( match . dwTimeStamp , rec . dwTimeStamp )
2021-06-10 23:29:15 +00:00
return match
2021-04-28 17:40:08 +12:00
def ldap_modify_timestamps ( self , name , delta ) :
records = self . ldap_get_records ( name )
for rec in records :
rec . dwTimeStamp + = delta
self . ldap_replace_records ( name , records )
def get_rpc_records ( self , name , dns_type = None ) :
if dns_type is None :
dns_type = dnsp . DNS_TYPE_ALL
select_flags = dnsserver . DNS_RPC_VIEW_AUTHORITY_DATA
buflen , res = self . rpc_conn . DnssrvEnumRecords2 (
dnsserver . DNS_CLIENT_VERSION_LONGHORN ,
0 ,
SERVER_IP ,
self . zone ,
name ,
None ,
dns_type ,
select_flags ,
None ,
None )
recs = [ ]
if not res or res . count == 0 :
return [ ]
for rec in res . rec :
recs . extend ( rec . records )
return recs
def dns_tombstone ( self , name ,
epoch_hours = DNS_TIMESTAMP_1981 ,
epoch_nttime = None ) :
dn = f ' DC= { name } , { self . zone_dn } '
r = dnsp . DnssrvRpcRecord ( )
r . wType = dnsp . DNS_TYPE_TOMBSTONE
# r.dwTimeStamp is a 32 bit value in hours, and r.data is an
# NTTIME (100 nanosecond intervals), both in the 1601 epoch. A
2023-08-29 14:23:51 +12:00
# tombstone will have both, but expiration calculations use
2021-04-28 17:40:08 +12:00
# the r.data NTTIME EntombedTime timestamp (see [MS-DNSP]).
r . dwTimeStamp = epoch_hours
if epoch_nttime is None :
r . data = epoch_hours * 3600 * 10 * 1000 * 1000
else :
r . data = epoch_nttime
msg = ldb . Message . from_dict ( self . samdb ,
{ ' dn ' : dn ,
' dnsRecord ' : [ ndr_pack ( r ) ] ,
' dnsTombstoned ' : ' TRUE '
} ,
ldb . FLAG_MOD_REPLACE )
try :
self . samdb . modify ( msg )
except ldb . LdbError as e :
if ' LDAP_NO_SUCH_OBJECT ' not in e . args [ 1 ] :
raise
# We need to do an add
msg [ " objectClass " ] = [ " top " , " dnsNode " ]
self . samdb . add ( msg )
def set_aging ( self , enable = False ) :
self . set_zone_int_params ( Aging = int ( bool ( enable ) ) )
def assert_timestamp_in_ballpark ( self , rec ) :
self . assertGreater ( rec . dwTimeStamp , DNS_TIMESTAMP_1970 )
self . assertLess ( rec . dwTimeStamp , DNS_TIMESTAMP_2101 )
def assert_nttime_in_hour_range ( self , t ,
hour_min = DNS_TIMESTAMP_1970 ,
hour_max = DNS_TIMESTAMP_2101 ) :
t / / = int ( 3600 * 1e7 )
self . assertGreater ( t , hour_min )
self . assertLess ( t , hour_max )
def assert_soon_after ( self , timestamp , reference ) :
""" Assert that a timestamp is the same or very slightly higher than a
reference timestamp .
Typically we expect the timestamps to be identical , unless an
hour has clicked over since the reference was taken . However
we allow one more hour in case it happens during a daylight
savings transition or something .
"""
2021-06-10 11:58:43 +12:00
if hasattr ( timestamp , ' dwTimeStamp ' ) :
timestamp = timestamp . dwTimeStamp
if hasattr ( reference , ' dwTimeStamp ' ) :
reference = reference . dwTimeStamp
diff = timestamp - reference
days = abs ( diff / 24.0 )
if diff < 0 :
msg = f " timestamp is { days } days ( { abs ( diff ) } hours) before reference "
elif diff > 2 :
msg = f " timestamp is { days } days ( { diff } hours) after reference "
else :
return
raise AssertionError ( msg )
def assert_timestamps_equal ( self , ts1 , ts2 ) :
""" Just like assertEqual(), but tells us the difference, not the
absolute values . e . g :
self . assertEqual ( a , b )
AssertionError : 3685491 != 3685371
self . assert_timestamps_equal ( a , b )
AssertionError : - 120 ( first is 5.0 days earlier than second )
Also , we turn a record into a timestamp if we need
"""
if hasattr ( ts1 , ' dwTimeStamp ' ) :
ts1 = ts1 . dwTimeStamp
if hasattr ( ts2 , ' dwTimeStamp ' ) :
ts2 = ts2 . dwTimeStamp
if ts1 == ts2 :
return
diff = ts1 - ts2
days = abs ( diff / 24.0 )
if ts1 == 0 or ts2 == 0 :
# when comparing to zero we don't want the number of days.
msg = f " timestamp { ts1 } != { ts2 } "
elif diff > 0 :
msg = f " { ts1 } is { days } days ( { diff } hours) after { ts2 } "
else :
msg = f " { ts1 } is { days } days ( { abs ( diff ) } hours) before { ts2 } "
raise AssertionError ( msg )
2021-04-28 17:40:08 +12:00
def test_update_timestamps_aging_off_then_on ( self ) :
# we will add a record with aging off
# it will have the current timestamp
self . set_aging ( False )
name = ' timestamp-now '
name2 = ' timestamp-eightdays '
rec = self . dns_update_record ( name , [ name ] )
start_time = rec . dwTimeStamp
self . assert_timestamp_in_ballpark ( rec )
# alter the timestamp -8 days using RPC
# with aging turned off, we expect no change
# when aging is on, we expect change
eight_days_ago = start_time - 8 * 24
rec = self . ldap_update_record ( name2 , [ name2 ] ,
dwTimeStamp = eight_days_ago )
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( rec . dwTimeStamp , eight_days_ago )
2021-04-28 17:40:08 +12:00
# if aging was on, this would change
rec = self . dns_update_record ( name2 , [ name2 ] )
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( rec . dwTimeStamp , eight_days_ago )
2021-04-28 17:40:08 +12:00
self . set_aging ( True )
rec = self . dns_update_record ( name2 , [ name2 ] )
self . assertGreaterEqual ( rec . dwTimeStamp , start_time )
def test_rpc_update_timestamps ( self ) :
# RPC always sets timestamps to zero on Windows.
self . set_aging ( False )
name = ' timestamp-now '
rec = self . dns_update_record ( name , [ name ] )
start_time = rec . dwTimeStamp
self . assert_timestamp_in_ballpark ( rec )
# attempt to alter the timestamp to something close by.
eight_days_ago = start_time - 8 * 24
rec = self . rpc_update_record ( name , [ name ] ,
dwTimeStamp = eight_days_ago )
self . assertEqual ( rec . dwTimeStamp , 0 )
# try again, with aging on
self . set_aging ( True )
rec = self . rpc_update_record ( name , [ name ] ,
dwTimeStamp = eight_days_ago )
self . assertEqual ( rec . dwTimeStamp , 0 )
# now that the record is static, a dns update won't change it
rec = self . dns_update_record ( name , [ name ] )
self . assertEqual ( rec . dwTimeStamp , 0 )
# but another record on the same node will behave normally
# i.e. the node is not static, the record is.
name2 = ' timestamp-eightdays '
rec = self . dns_update_record ( name2 , [ name2 ] )
self . assert_soon_after ( rec . dwTimeStamp ,
start_time )
def get_txt_timestamps ( self , name , * txts ) :
records = self . ldap_get_records ( name )
ret = [ ]
for t in txts :
for r in records :
t2 = [ x for x in r . data . str ]
if t == t2 :
ret . append ( r . dwTimeStamp )
return ret
def test_update_aging_disabled_2 ( self ) :
# With aging disabled, Windows updates the timestamps of all
# records when one is updated.
name = ' test '
txt1 = [ ' test txt ' ]
txt2 = [ ' test ' , ' txt2 ' ]
txt3 = [ ' test ' , ' txt3 ' ]
self . set_aging ( False )
current_time = self . dns_update_record ( name , txt1 ) . dwTimeStamp
six_days_ago = current_time - 6 * 24
eight_days_ago = current_time - 8 * 24
fifteen_days_ago = current_time - 15 * 24
hundred_days_ago = current_time - 100 * 24
thousand_days_ago = current_time - 1000 * 24
for timestamp in ( current_time ,
six_days_ago ,
eight_days_ago ,
fifteen_days_ago ,
hundred_days_ago ,
thousand_days_ago ) :
# wind back
self . ldap_update_record ( name , txt1 , dwTimeStamp = timestamp )
self . assertEqual ( self . get_txt_timestamps ( name , txt1 ) , [ timestamp ] )
# no change here
update_timestamp = self . dns_update_record ( name , txt1 ) . dwTimeStamp
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( update_timestamp , timestamp )
2021-04-28 17:40:08 +12:00
# adding a fresh record
for timestamp in ( current_time ,
six_days_ago ,
eight_days_ago ,
fifteen_days_ago ,
hundred_days_ago ,
thousand_days_ago ,
100000 ,
100 ) :
# wind back
timestamp1 = self . ldap_update_record (
name ,
txt1 ,
dwTimeStamp = timestamp ) . dwTimeStamp
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp1 , timestamp )
2021-04-28 17:40:08 +12:00
self . dns_update_record ( name , txt2 )
timestamps = self . get_txt_timestamps ( name , txt1 , txt2 )
self . assertEqual ( timestamps , [ timestamp , current_time ] )
self . ldap_delete_record ( name , txt2 )
timestamps = self . get_txt_timestamps ( name , txt1 )
self . assertEqual ( timestamps , [ timestamp ] )
# add record 2.
timestamp2 = self . dns_update_record ( name , txt2 ) . dwTimeStamp
self . assert_soon_after ( timestamp2 , current_time )
for timestamp in ( current_time ,
six_days_ago ,
eight_days_ago ,
fifteen_days_ago ,
hundred_days_ago ,
thousand_days_ago ,
100000 ,
100 ) :
# wind back
self . ldap_update_record ( name , txt1 , dwTimeStamp = timestamp )
timestamp1 = self . get_unique_txt_record ( name , txt1 ) . dwTimeStamp
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp1 , timestamp )
2021-04-28 17:40:08 +12:00
timestamp2 = self . dns_update_record ( name , txt2 ) . dwTimeStamp
# txt1 timestamp is now current time
timestamps = self . get_txt_timestamps ( name , txt1 , txt2 )
self . assertEqual ( timestamps , [ timestamp , current_time ] )
# with 3 records, no change
for timestamp in ( current_time ,
six_days_ago ,
eight_days_ago ,
fifteen_days_ago ,
hundred_days_ago ,
thousand_days_ago ,
100000 ,
10 ) :
# wind back
self . ldap_update_record ( name , txt1 , dwTimeStamp = timestamp )
self . ldap_update_record ( name , txt2 , dwTimeStamp = timestamp )
self . ldap_update_record ( name , txt3 , dwTimeStamp = ( timestamp + 30 ) )
timestamp3 = self . get_unique_txt_record ( name , txt3 ) . dwTimeStamp
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp3 , timestamp + 30 )
2021-04-28 17:40:08 +12:00
self . dns_update_record ( name , txt2 ) . dwTimeStamp
timestamps = self . get_txt_timestamps ( name , txt1 , txt2 , txt3 )
self . assertEqual ( timestamps , [ timestamp ,
timestamp ,
timestamp + 30 ] )
# with 3 records, one of which is static
# first we set the updatee's timestamp to a recognisable number
self . ldap_update_record ( name , txt2 , dwTimeStamp = 999999 )
for timestamp in ( current_time ,
six_days_ago ,
eight_days_ago ,
fifteen_days_ago ,
hundred_days_ago ,
thousand_days_ago ,
100000 ,
10 ) :
# wind back
self . ldap_update_record ( name , txt1 , dwTimeStamp = 0 )
self . ldap_update_record ( name , txt3 , dwTimeStamp = ( timestamp - 9 ) )
timestamp3 = self . get_unique_txt_record ( name , txt3 ) . dwTimeStamp
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp3 , timestamp - 9 )
2021-04-28 17:40:08 +12:00
self . dns_update_record ( name , txt2 )
timestamps = self . get_txt_timestamps ( name , txt1 , txt2 , txt3 )
self . assertEqual ( timestamps , [ 0 ,
999999 ,
timestamp - 9 ] )
# with 3 records, updating one which is static
timestamp3 = self . dns_update_record ( name , txt3 ) . dwTimeStamp
for timestamp in ( current_time ,
six_days_ago ,
eight_days_ago ,
fifteen_days_ago ,
hundred_days_ago ,
thousand_days_ago ,
100000 ,
10 ) :
# wind back
self . ldap_update_record ( name , txt1 , dwTimeStamp = 0 )
self . ldap_update_record ( name , txt2 , dwTimeStamp = 0 )
self . ldap_update_record ( name , txt3 , dwTimeStamp = ( timestamp + 30 ) )
timestamp3 = self . get_unique_txt_record ( name , txt3 ) . dwTimeStamp
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp3 , timestamp + 30 )
2021-04-28 17:40:08 +12:00
self . dns_update_record ( name , txt2 ) . dwTimeStamp
timestamps = self . get_txt_timestamps ( name , txt1 , txt2 , txt3 )
self . assertEqual ( timestamps , [ 0 ,
0 ,
timestamp + 30 ] )
# with 3 records, after the static nodes have been replaced
self . ldap_update_record ( name , txt1 , dwTimeStamp = 777777 )
self . ldap_update_record ( name , txt2 , dwTimeStamp = 888888 )
timestamp3 = self . dns_update_record ( name , txt3 ) . dwTimeStamp
for timestamp in ( current_time ,
six_days_ago ,
eight_days_ago ,
fifteen_days_ago ,
hundred_days_ago ,
thousand_days_ago ,
100000 ,
10 ) :
# wind back
self . ldap_update_record ( name , txt3 , dwTimeStamp = ( timestamp ) )
timestamp3 = self . get_unique_txt_record ( name , txt3 ) . dwTimeStamp
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp3 , timestamp )
2021-04-28 17:40:08 +12:00
self . dns_update_record ( name , txt2 )
timestamps = self . get_txt_timestamps ( name , txt1 , txt2 , txt3 )
self . assertEqual ( timestamps , [ 777777 ,
888888 ,
timestamp ] )
def _test_update_aging_disabled_n_days_ago ( self , n_days ) :
name = ' test '
txt1 = [ ' 1 ' ]
txt2 = [ ' 2 ' ]
self . set_aging ( False )
current_time = self . dns_update_record ( name , txt1 ) . dwTimeStamp
# rewind timestamp using ldap
self . ldap_modify_timestamps ( name , n_days * - 24 )
n_days_ago = self . get_unique_txt_record ( name , txt1 ) . dwTimeStamp
self . assertGreater ( current_time , n_days_ago )
# no change when updating this record
update_timestamp = self . dns_update_record ( name , txt1 ) . dwTimeStamp
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( update_timestamp , n_days_ago )
2021-04-28 17:40:08 +12:00
# add another record, which should have the current timestamp
timestamp2 = self . dns_update_record ( name , txt2 ) . dwTimeStamp
self . assert_soon_after ( timestamp2 , current_time )
# get the original record timestamp. NOW it matches current_time
timestamp1 = self . get_unique_txt_record ( name , txt1 ) . dwTimeStamp
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp1 , timestamp2 )
2021-04-28 17:40:08 +12:00
# let's repeat that, this time with txt2 existing
self . ldap_update_record ( name , txt1 , dwTimeStamp = n_days_ago )
timestamp1 = self . dns_update_record ( name , txt1 ) . dwTimeStamp
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp1 , n_days_ago )
2021-04-28 17:40:08 +12:00
# this update is not an add
timestamp2 = self . dns_update_record ( name , txt2 ) . dwTimeStamp
self . assert_soon_after ( timestamp2 , current_time )
# now timestamp1 is not changed
timestamp1 = self . get_unique_txt_record ( name , txt1 ) . dwTimeStamp
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp1 , n_days_ago )
2021-04-28 17:40:08 +12:00
# delete record2, try again
self . ldap_delete_record ( name , txt2 )
self . ldap_update_record ( name , txt1 , dwTimeStamp = n_days_ago )
timestamp1 = self . dns_update_record ( name , txt1 ) . dwTimeStamp
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp1 , n_days_ago )
2021-04-28 17:40:08 +12:00
# here we are re-adding the deleted record
timestamp2 = self . dns_update_record ( name , txt2 ) . dwTimeStamp
self . assert_soon_after ( timestamp2 , current_time )
timestamp1 = self . get_unique_txt_record ( name , txt1 ) . dwTimeStamp
# It gets weird HERE.
# note how the SIBLING of the deleted, re-added record differs
# from the sibling of freshly added record, depending on the
# time difference.
if n_days < = 7 :
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp1 , n_days_ago )
2021-04-28 17:40:08 +12:00
else :
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp1 , timestamp2 )
2021-04-28 17:40:08 +12:00
# re-timestamp record2, try again
self . ldap_update_record ( name , txt2 , dwTimeStamp = n_days_ago )
self . ldap_update_record ( name , txt1 , dwTimeStamp = n_days_ago )
timestamp1 = self . dns_update_record ( name , txt1 ) . dwTimeStamp
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp1 , n_days_ago )
2021-04-28 17:40:08 +12:00
# no change
timestamp2 = self . dns_update_record ( name , txt2 ) . dwTimeStamp
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp2 , n_days_ago )
2021-04-28 17:40:08 +12:00
# also no change
timestamp1 = self . get_unique_txt_record ( name , txt1 ) . dwTimeStamp
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp1 , timestamp2 )
2021-04-28 17:40:08 +12:00
# let's introduce another record
txt3 = [ ' 3 ' ]
self . ldap_update_record ( name , txt2 , dwTimeStamp = n_days_ago )
self . ldap_update_record ( name , txt1 , dwTimeStamp = n_days_ago )
timestamp3 = self . dns_update_record ( name , txt3 ) . dwTimeStamp
self . assert_soon_after ( timestamp3 , current_time )
timestamp1 = self . get_unique_txt_record ( name , txt1 ) . dwTimeStamp
timestamp2 = self . get_unique_txt_record ( name , txt2 ) . dwTimeStamp
if n_days < = 7 :
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp1 , n_days_ago )
2021-04-28 17:40:08 +12:00
else :
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp1 , timestamp3 )
2021-04-28 17:40:08 +12:00
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp2 , timestamp3 )
2021-04-28 17:40:08 +12:00
self . ldap_delete_record ( name , txt3 )
2021-06-14 23:36:04 +12:00
timestamp3 = self . dns_update_record ( name , txt3 ) . dwTimeStamp
2021-04-28 17:40:08 +12:00
self . assert_soon_after ( timestamp3 , current_time )
timestamp1 = self . get_unique_txt_record ( name , txt1 ) . dwTimeStamp
timestamp2 = self . get_unique_txt_record ( name , txt2 ) . dwTimeStamp
if n_days < = 7 :
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp1 , n_days_ago )
2021-04-28 17:40:08 +12:00
else :
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp1 , timestamp3 )
2021-04-28 17:40:08 +12:00
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp2 , timestamp3 )
2021-04-28 17:40:08 +12:00
# and here we'll make txt3 static
txt4 = [ ' 4 ' ]
# and here we'll make txt1 static
self . ldap_update_record ( name , txt1 , dwTimeStamp = 0 )
self . ldap_update_record ( name , txt2 , dwTimeStamp = n_days_ago )
self . ldap_update_record ( name , txt3 , dwTimeStamp = n_days_ago )
timestamp1 = self . dns_update_record ( name , txt1 ) . dwTimeStamp
timestamp2 = self . get_unique_txt_record ( name , txt2 ) . dwTimeStamp
timestamp3 = self . get_unique_txt_record ( name , txt3 ) . dwTimeStamp
timestamp4 = self . dns_update_record ( name , txt4 ) . dwTimeStamp
self . assertEqual ( timestamp1 , 0 )
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp2 , n_days_ago )
self . assert_timestamps_equal ( timestamp3 , n_days_ago )
2021-04-28 17:40:08 +12:00
self . assert_soon_after ( timestamp4 , current_time )
def test_update_aging_disabled_in_no_refresh_window ( self ) :
self . _test_update_aging_disabled_n_days_ago ( 4 )
def test_update_aging_disabled_on_no_refresh_boundary ( self ) :
self . _test_update_aging_disabled_n_days_ago ( 7 )
def test_update_aging_disabled_in_refresh_window ( self ) :
self . _test_update_aging_disabled_n_days_ago ( 9 )
def test_update_aging_disabled_beyond_refresh_window ( self ) :
self . _test_update_aging_disabled_n_days_ago ( 16 )
def test_update_aging_disabled_in_eighteenth_century ( self ) :
self . _test_update_aging_disabled_n_days_ago ( 100000 )
def test_update_aging_disabled_static ( self ) :
name = ' test '
txt1 = [ ' 1 ' ]
txt2 = [ ' 2 ' ]
self . set_aging ( False )
current_time = self . dns_update_record ( name , txt1 ) . dwTimeStamp
self . ldap_update_record ( name , txt1 , dwTimeStamp = 0 )
# no change when updating this record
timestamp1 = self . dns_update_record ( name , txt1 ) . dwTimeStamp
self . assertEqual ( timestamp1 , 0 )
# add another record, which should have the current timestamp
timestamp2 = self . dns_update_record ( name , txt2 ) . dwTimeStamp
self . assert_soon_after ( timestamp2 , current_time )
timestamp1 = self . get_unique_txt_record ( name , txt1 ) . dwTimeStamp
self . assert_soon_after ( timestamp1 , current_time )
# let's repeat that, this time with txt2 existing
timestamp1 = self . dns_update_record ( name , txt1 ) . dwTimeStamp
self . assert_soon_after ( timestamp2 , current_time )
timestamp2 = self . dns_update_record ( name , txt2 ) . dwTimeStamp
self . assert_soon_after ( timestamp2 , current_time )
# delete record2, try again
self . ldap_delete_record ( name , txt2 )
self . ldap_update_record ( name , txt1 , dwTimeStamp = 0 )
# no change when updating this record
timestamp1 = self . dns_update_record ( name , txt1 ) . dwTimeStamp
self . assertEqual ( timestamp1 , 0 )
timestamp2 = self . dns_update_record ( name , txt2 ) . dwTimeStamp
self . assertEqual ( timestamp2 , 0 )
timestamp1 = self . get_unique_txt_record ( name , txt1 ) . dwTimeStamp
self . assertEqual ( timestamp1 , 0 )
# re-timestamp record2, try again
self . ldap_update_record ( name , txt2 , dwTimeStamp = 1 )
self . ldap_update_record ( name , txt1 , dwTimeStamp = 0 )
# no change when updating this record
timestamp2 = self . dns_update_record ( name , txt2 ) . dwTimeStamp
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp2 , 1 )
2021-04-28 17:40:08 +12:00
def test_update_aging_disabled ( self ) :
# With aging disabled, Windows updates the timestamps of all
# records when one is updated.
name = ' test '
txt1 = [ ' test txt ' ]
txt2 = [ ' test ' , ' txt2 ' ]
txt3 = [ ' test ' , ' txt3 ' ]
minus_6 = - 6 * 24
minus_8 = - 8 * 24
self . set_aging ( False )
current_time = self . dns_update_record ( name , txt1 ) . dwTimeStamp
# rewind timestamp using ldap
self . ldap_modify_timestamps ( name , minus_6 )
after_mod = self . get_unique_txt_record ( name , txt1 )
six_days_ago = after_mod . dwTimeStamp
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( six_days_ago , current_time + minus_6 )
2021-04-28 17:40:08 +12:00
# no change
update_timestamp = self . dns_update_record ( name , txt1 ) . dwTimeStamp
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( update_timestamp , six_days_ago )
2021-04-28 17:40:08 +12:00
self . check_query_txt ( name , txt1 , zone = self . zone )
# another record
timestamp2 = self . dns_update_record ( name , txt2 ) . dwTimeStamp
self . assert_soon_after ( timestamp2 , current_time )
timestamp1 = self . get_unique_txt_record ( name , txt1 ) . dwTimeStamp
# without aging, timestamp1 is changed!!
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp1 , timestamp2 )
2021-04-28 17:40:08 +12:00
# Set both records back to 8 days ago.
self . ldap_modify_timestamps ( name , minus_8 )
eight_days_ago = self . get_unique_txt_record ( name , txt1 ) . dwTimeStamp
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( eight_days_ago , current_time + minus_8 )
2021-04-28 17:40:08 +12:00
update2 = self . dns_update_record ( name , txt2 )
# Without aging on, an update should not change the timestamps.
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( update2 . dwTimeStamp , eight_days_ago )
2021-04-28 17:40:08 +12:00
timestamp1 = self . get_unique_txt_record ( name , txt1 ) . dwTimeStamp
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp1 , eight_days_ago )
2021-04-28 17:40:08 +12:00
# Add another txt record. The new record should have the now
# timestamp, and drag the others up with it.
timestamp3 = self . dns_update_record ( name , txt3 ) . dwTimeStamp
self . assert_soon_after ( timestamp3 , current_time )
timestamp1 = self . get_unique_txt_record ( name , txt1 ) . dwTimeStamp
timestamp2 = self . get_unique_txt_record ( name , txt2 ) . dwTimeStamp
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp1 , timestamp3 )
self . assert_timestamps_equal ( timestamp2 , timestamp3 )
2021-04-28 17:40:08 +12:00
hundred_days_ago = current_time - 100 * 24
thousand_days_ago = current_time - 1000 * 24
record = self . ldap_update_record ( name , txt1 ,
dwTimeStamp = hundred_days_ago )
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( record . dwTimeStamp , hundred_days_ago )
2021-04-28 17:40:08 +12:00
record = self . ldap_update_record ( name , txt2 ,
dwTimeStamp = thousand_days_ago )
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( record . dwTimeStamp , thousand_days_ago )
2021-04-28 17:40:08 +12:00
# update 3, will others change (because beyond RefreshInterval)? yes.
timestamp3 = self . dns_update_record ( name , txt3 ) . dwTimeStamp
timestamp1 = self . get_unique_txt_record ( name , txt1 ) . dwTimeStamp
timestamp2 = self . get_unique_txt_record ( name , txt2 ) . dwTimeStamp
self . assert_soon_after ( timestamp3 , current_time )
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp1 , hundred_days_ago )
self . assert_timestamps_equal ( timestamp2 , thousand_days_ago )
2021-04-28 17:40:08 +12:00
fifteen_days_ago = current_time - 15 * 24
self . ldap_update_record ( name , txt3 , dwTimeStamp = fifteen_days_ago )
timestamp2 = self . dns_update_record ( name , txt2 ) . dwTimeStamp
timestamp1 = self . get_unique_txt_record ( name , txt1 ) . dwTimeStamp
timestamp3 = self . get_unique_txt_record ( name , txt3 ) . dwTimeStamp
# DNS update has no effect because all records are old
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp2 , thousand_days_ago )
self . assert_timestamps_equal ( timestamp1 , hundred_days_ago )
self . assert_timestamps_equal ( timestamp3 , fifteen_days_ago )
2021-04-28 17:40:08 +12:00
# Does update of old record affect timestamp of refreshable record? No.
self . ldap_update_record ( name , txt3 , dwTimeStamp = eight_days_ago )
timestamp2 = self . dns_update_record ( name , txt2 ) . dwTimeStamp
timestamp1 = self . get_unique_txt_record ( name , txt1 ) . dwTimeStamp
timestamp3 = self . get_unique_txt_record ( name , txt3 ) . dwTimeStamp
# DNS update has no effect because all records are old
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp2 , thousand_days_ago )
self . assert_timestamps_equal ( timestamp1 , hundred_days_ago )
self . assert_timestamps_equal ( timestamp3 , eight_days_ago )
2021-04-28 17:40:08 +12:00
# RPC zeros timestamp, after which updates won't change it.
# BUT it refreshes all others!
self . rpc_update_record ( name , txt2 )
timestamp2 = self . dns_update_record ( name , txt3 ) . dwTimeStamp
timestamp1 = self . get_unique_txt_record ( name , txt1 ) . dwTimeStamp
timestamp2 = self . get_unique_txt_record ( name , txt2 ) . dwTimeStamp
self . assertEqual ( timestamp2 , 0 )
self . assert_soon_after ( timestamp1 , current_time )
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp3 , eight_days_ago )
2021-04-28 17:40:08 +12:00
def test_update_aging_enabled ( self ) :
name = ' test '
txt1 = [ ' test txt ' ]
txt2 = [ ' test ' , ' txt2 ' ]
txt3 = [ ' test ' , ' txt3 ' ]
txt4 = [ ' 4 ' ]
self . set_aging ( True )
current_time = self . dns_update_record ( name , txt2 ) . dwTimeStamp
six_days_ago = current_time - 6 * 24
eight_days_ago = current_time - 8 * 24
fifteen_days_ago = current_time - 15 * 24
hundred_days_ago = current_time - 100 * 24
self . ldap_update_record ( name , txt1 , dwTimeStamp = six_days_ago )
# with or without aging, a delta of -6 days does not affect
# timestamps, because dwNoRefreshInterval is 7 days.
timestamp1 = self . dns_update_record ( name , txt1 ) . dwTimeStamp
timestamp2 = self . get_unique_txt_record ( name , txt2 ) . dwTimeStamp
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp1 , six_days_ago )
2021-04-28 17:40:08 +12:00
self . assert_soon_after ( timestamp2 , current_time )
self . ldap_update_record ( name , txt3 , dwTimeStamp = eight_days_ago )
timestamp3 = self . get_unique_txt_record ( name , txt3 ) . dwTimeStamp
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp3 , eight_days_ago )
2021-04-28 17:40:08 +12:00
# update 1, what happens to 2 and 3? Nothing?
timestamp1 = self . dns_update_record ( name , txt1 ) . dwTimeStamp
timestamp2 = self . get_unique_txt_record ( name , txt2 ) . dwTimeStamp
timestamp3 = self . get_unique_txt_record ( name , txt3 ) . dwTimeStamp
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp1 , six_days_ago )
2021-04-28 17:40:08 +12:00
self . assert_soon_after ( timestamp2 , current_time )
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp3 , eight_days_ago )
2021-04-28 17:40:08 +12:00
# now set 1 to 8 days, and we should see changes
self . ldap_update_record ( name , txt1 , dwTimeStamp = eight_days_ago )
# update 1, what happens to 2 and 3? Nothing?
timestamp1 = self . dns_update_record ( name , txt1 ) . dwTimeStamp
timestamp2 = self . get_unique_txt_record ( name , txt2 ) . dwTimeStamp
timestamp3 = self . get_unique_txt_record ( name , txt3 ) . dwTimeStamp
self . assert_soon_after ( timestamp1 , current_time )
self . assert_soon_after ( timestamp2 , current_time )
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp3 , eight_days_ago )
2021-04-28 17:40:08 +12:00
# next few ones use these numbers
self . ldap_update_record ( name , txt1 , dwTimeStamp = fifteen_days_ago )
self . ldap_update_record ( name , txt2 , dwTimeStamp = six_days_ago )
self . ldap_update_record ( name , txt3 , dwTimeStamp = eight_days_ago )
# change even though 1 is outside the window
timestamp1 = self . dns_update_record ( name , txt1 ) . dwTimeStamp
timestamp2 = self . get_unique_txt_record ( name , txt2 ) . dwTimeStamp
timestamp3 = self . get_unique_txt_record ( name , txt3 ) . dwTimeStamp
self . assert_soon_after ( timestamp1 , current_time )
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp2 , six_days_ago )
self . assert_timestamps_equal ( timestamp3 , eight_days_ago )
2021-04-28 17:40:08 +12:00
# reset 1
self . ldap_update_record ( name , txt1 , dwTimeStamp = fifteen_days_ago )
# no change, because 2 is outside the window
timestamp2 = self . dns_update_record ( name , txt2 ) . dwTimeStamp
timestamp1 = self . get_unique_txt_record ( name , txt1 ) . dwTimeStamp
timestamp3 = self . get_unique_txt_record ( name , txt3 ) . dwTimeStamp
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp1 , fifteen_days_ago )
self . assert_timestamps_equal ( timestamp2 , six_days_ago )
self . assert_timestamps_equal ( timestamp3 , eight_days_ago )
2021-04-28 17:40:08 +12:00
# 3 changes, others do not
timestamp3 = self . dns_update_record ( name , txt3 ) . dwTimeStamp
timestamp1 = self . get_unique_txt_record ( name , txt1 ) . dwTimeStamp
timestamp2 = self . get_unique_txt_record ( name , txt2 ) . dwTimeStamp
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp1 , fifteen_days_ago )
self . assert_timestamps_equal ( timestamp2 , six_days_ago )
2021-04-28 17:40:08 +12:00
self . assert_soon_after ( timestamp3 , current_time )
# reset 3 to 100 days
self . ldap_update_record ( name , txt3 , dwTimeStamp = hundred_days_ago )
# 3 changes, others do not
timestamp3 = self . dns_update_record ( name , txt3 ) . dwTimeStamp
timestamp1 = self . get_unique_txt_record ( name , txt1 ) . dwTimeStamp
timestamp2 = self . get_unique_txt_record ( name , txt2 ) . dwTimeStamp
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp1 , fifteen_days_ago )
self . assert_timestamps_equal ( timestamp2 , six_days_ago )
2021-04-28 17:40:08 +12:00
self . assert_soon_after ( timestamp3 , current_time )
# reset 1 and 3 to 8 days. does update of 1 affect 3?
self . ldap_update_record ( name , txt1 , dwTimeStamp = eight_days_ago )
self . ldap_update_record ( name , txt3 , dwTimeStamp = eight_days_ago )
# 1 changes, others do not
timestamp1 = self . dns_update_record ( name , txt1 ) . dwTimeStamp
timestamp2 = self . get_unique_txt_record ( name , txt2 ) . dwTimeStamp
timestamp3 = self . get_unique_txt_record ( name , txt3 ) . dwTimeStamp
self . assert_soon_after ( timestamp1 , current_time )
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp2 , six_days_ago )
self . assert_timestamps_equal ( timestamp3 , eight_days_ago )
2021-04-28 17:40:08 +12:00
# Try an RPC update, zeroing 1 --> what happens to 3?
timestamp1 = self . rpc_update_record ( name , txt1 ) . dwTimeStamp
timestamp2 = self . get_unique_txt_record ( name , txt2 ) . dwTimeStamp
timestamp3 = self . get_unique_txt_record ( name , txt3 ) . dwTimeStamp
self . assertEqual ( timestamp1 , 0 )
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp2 , six_days_ago )
self . assert_timestamps_equal ( timestamp3 , eight_days_ago )
2021-04-28 17:40:08 +12:00
# with 2 and 3 at 8 days, does static record change things?
self . ldap_update_record ( name , txt2 , dwTimeStamp = eight_days_ago )
# 2 changes, but to zero!
timestamp2 = self . dns_update_record ( name , txt2 ) . dwTimeStamp
timestamp1 = self . get_unique_txt_record ( name , txt1 ) . dwTimeStamp
timestamp3 = self . get_unique_txt_record ( name , txt3 ) . dwTimeStamp
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp1 , 0 )
self . assert_timestamps_equal ( timestamp2 , 0 )
self . assert_timestamps_equal ( timestamp3 , eight_days_ago )
2021-04-28 17:40:08 +12:00
self . ldap_update_record ( name , txt2 , dwTimeStamp = six_days_ago )
self . ldap_update_record ( name , txt1 , dwTimeStamp = 3000000 )
timestamp1 = self . get_unique_txt_record ( name , txt1 ) . dwTimeStamp
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp1 , 3000000 )
2021-04-28 17:40:08 +12:00
# dns update remembers that node is static, even with no
# static records.
timestamp1 = self . dns_update_record ( name , txt1 ) . dwTimeStamp
self . assertEqual ( timestamp1 , 0 )
# Add another txt record. The new record should have the now
# timestamp, and the others should remain unchanged.
# BUT somehow record 1 is static!?
timestamp4 = self . dns_update_record ( name , txt4 ) . dwTimeStamp
timestamp1 = self . get_unique_txt_record ( name , txt1 ) . dwTimeStamp
timestamp2 = self . get_unique_txt_record ( name , txt2 ) . dwTimeStamp
timestamp3 = self . get_unique_txt_record ( name , txt3 ) . dwTimeStamp
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp1 , 0 )
self . assert_timestamps_equal ( timestamp2 , six_days_ago )
self . assert_timestamps_equal ( timestamp3 , eight_days_ago )
self . assert_timestamps_equal ( timestamp4 , 0 )
2021-04-28 17:40:08 +12:00
def _test_update_aging_enabled_n_days_ago ( self , n_days ) :
name = ' test '
txt1 = [ ' 1 ' ]
txt2 = [ ' 2 ' ]
delta = n_days * - 24
self . set_aging ( True )
current_time = self . dns_update_record ( name , txt1 ) . dwTimeStamp
# rewind timestamp using ldap
self . ldap_modify_timestamps ( name , delta )
n_days_ago = self . get_unique_txt_record ( name , txt1 ) . dwTimeStamp
self . assertGreater ( current_time , n_days_ago )
# update changes timestamp depending on time.
timestamp1 = self . dns_update_record ( name , txt1 ) . dwTimeStamp
if n_days < = 7 :
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp1 , n_days_ago )
2021-04-28 17:40:08 +12:00
else :
self . assert_soon_after ( timestamp1 , current_time )
# add another record, which should have the current timestamp
timestamp2 = self . dns_update_record ( name , txt2 ) . dwTimeStamp
self . assert_soon_after ( timestamp2 , current_time )
# first record should not have changed
timestamp1_b = self . get_unique_txt_record ( name , txt1 ) . dwTimeStamp
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp1 , timestamp1_b )
2021-04-28 17:40:08 +12:00
# let's repeat that, this time with txt2 existing
self . ldap_update_record ( name , txt1 , dwTimeStamp = n_days_ago )
timestamp1 = self . dns_update_record ( name , txt1 ) . dwTimeStamp
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp1 , timestamp1_b )
2021-04-28 17:40:08 +12:00
# this update is not an add. record 2 is already up-to-date
timestamp2 = self . dns_update_record ( name , txt2 ) . dwTimeStamp
self . assert_soon_after ( timestamp2 , current_time )
# now timestamp1 is not changed
timestamp1 = self . get_unique_txt_record ( name , txt1 ) . dwTimeStamp
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp1 , timestamp1_b )
2021-04-28 17:40:08 +12:00
# delete record2, try again
self . ldap_delete_record ( name , txt2 )
self . ldap_update_record ( name , txt1 , dwTimeStamp = n_days_ago )
timestamp1 = self . dns_update_record ( name , txt1 ) . dwTimeStamp
if n_days < = 7 :
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp1 , n_days_ago )
2021-04-28 17:40:08 +12:00
else :
self . assert_soon_after ( timestamp1 , current_time )
# here we are re-adding the deleted record
timestamp2 = self . dns_update_record ( name , txt2 ) . dwTimeStamp
self . assert_soon_after ( timestamp2 , current_time )
timestamp1 = self . get_unique_txt_record ( name , txt1 ) . dwTimeStamp
# It gets weird HERE.
# note how the SIBLING of the deleted, re-added record differs
# from the sibling of freshly added record, depending on the
# time difference.
if n_days < = 7 :
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp1 , n_days_ago )
2021-04-28 17:40:08 +12:00
else :
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp1 , timestamp2 )
2021-04-28 17:40:08 +12:00
# re-timestamp record2, try again
self . ldap_update_record ( name , txt2 , dwTimeStamp = n_days_ago )
self . ldap_update_record ( name , txt1 , dwTimeStamp = n_days_ago )
# this should make no difference
timestamp1_b = self . dns_update_record ( name , txt1 ) . dwTimeStamp
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp1 , timestamp1_b )
2021-04-28 17:40:08 +12:00
# no change
timestamp2 = self . dns_update_record ( name , txt2 ) . dwTimeStamp
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp2 , timestamp1 )
2021-04-28 17:40:08 +12:00
# also no change
timestamp1 = self . get_unique_txt_record ( name , txt1 ) . dwTimeStamp
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp1 , timestamp2 )
2021-04-28 17:40:08 +12:00
# let's introduce another record
txt3 = [ ' 3 ' ]
self . ldap_update_record ( name , txt2 , dwTimeStamp = n_days_ago )
self . ldap_update_record ( name , txt1 , dwTimeStamp = n_days_ago )
timestamp3 = self . dns_update_record ( name , txt3 ) . dwTimeStamp
self . assert_soon_after ( timestamp3 , current_time )
timestamp1 = self . get_unique_txt_record ( name , txt1 ) . dwTimeStamp
timestamp2 = self . get_unique_txt_record ( name , txt2 ) . dwTimeStamp
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp1 , n_days_ago )
self . assert_timestamps_equal ( timestamp2 , n_days_ago )
2021-04-28 17:40:08 +12:00
self . ldap_delete_record ( name , txt3 )
2021-06-14 23:36:04 +12:00
timestamp3 = self . dns_update_record ( name , txt3 ) . dwTimeStamp
2021-04-28 17:40:08 +12:00
self . assert_soon_after ( timestamp3 , current_time )
timestamp1 = self . get_unique_txt_record ( name , txt1 ) . dwTimeStamp
timestamp2 = self . get_unique_txt_record ( name , txt2 ) . dwTimeStamp
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp1 , n_days_ago )
self . assert_timestamps_equal ( timestamp2 , n_days_ago )
2021-04-28 17:40:08 +12:00
txt4 = [ ' 4 ' ]
# Because txt1 is static, txt4 is static
self . ldap_update_record ( name , txt1 , dwTimeStamp = 0 )
self . ldap_update_record ( name , txt2 , dwTimeStamp = n_days_ago )
self . ldap_update_record ( name , txt3 , dwTimeStamp = n_days_ago )
timestamp1 = self . dns_update_record ( name , txt1 ) . dwTimeStamp
timestamp2 = self . get_unique_txt_record ( name , txt2 ) . dwTimeStamp
timestamp3 = self . get_unique_txt_record ( name , txt3 ) . dwTimeStamp
timestamp4 = self . dns_update_record ( name , txt4 ) . dwTimeStamp
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp1 , 0 )
self . assert_timestamps_equal ( timestamp2 , n_days_ago )
self . assert_timestamps_equal ( timestamp3 , n_days_ago )
self . assert_timestamps_equal ( timestamp4 , 0 )
2021-04-28 17:40:08 +12:00
longer_ago = n_days_ago / / 2
# remove all static records.
self . ldap_delete_record ( name , txt4 )
self . ldap_update_record ( name , txt1 , dwTimeStamp = longer_ago )
self . ldap_update_record ( name , txt2 , dwTimeStamp = n_days_ago )
self . ldap_update_record ( name , txt3 , dwTimeStamp = n_days_ago )
timestamp1 = self . get_unique_txt_record ( name , txt1 ) . dwTimeStamp
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp1 , longer_ago )
2021-04-28 17:40:08 +12:00
timestamp4 = self . dns_update_record ( name , txt4 ) . dwTimeStamp
timestamp2 = self . get_unique_txt_record ( name , txt2 ) . dwTimeStamp
timestamp3 = self . get_unique_txt_record ( name , txt3 ) . dwTimeStamp
timestamp1 = self . get_unique_txt_record ( name , txt1 ) . dwTimeStamp
2023-08-29 14:23:51 +12:00
# Here, although there is no record from which to get the zero
2021-04-28 17:40:08 +12:00
# timestamp, record 4 does it anyway.
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp1 , longer_ago )
self . assert_timestamps_equal ( timestamp2 , n_days_ago )
self . assert_timestamps_equal ( timestamp3 , n_days_ago )
self . assert_timestamps_equal ( timestamp4 , 0 )
2021-04-28 17:40:08 +12:00
# and now record 1 wants to be static.
self . ldap_update_record ( name , txt4 , dwTimeStamp = longer_ago )
timestamp4 = self . get_unique_txt_record ( name , txt4 ) . dwTimeStamp
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp4 , longer_ago )
2021-04-28 17:40:08 +12:00
timestamp1 = self . dns_update_record ( name , txt1 ) . dwTimeStamp
timestamp4 = self . get_unique_txt_record ( name , txt4 ) . dwTimeStamp
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( timestamp1 , 0 )
self . assert_timestamps_equal ( timestamp4 , longer_ago )
2021-04-28 17:40:08 +12:00
def test_update_aging_enabled_in_no_refresh_window ( self ) :
self . _test_update_aging_enabled_n_days_ago ( 4 )
def test_update_aging_enabled_on_no_refresh_boundary ( self ) :
self . _test_update_aging_enabled_n_days_ago ( 7 )
def test_update_aging_enabled_in_refresh_window ( self ) :
self . _test_update_aging_enabled_n_days_ago ( 9 )
def test_update_aging_enabled_beyond_refresh_window ( self ) :
self . _test_update_aging_enabled_n_days_ago ( 16 )
def test_update_aging_enabled_in_eighteenth_century ( self ) :
self . _test_update_aging_enabled_n_days_ago ( 100000 )
def test_update_static_stickiness ( self ) :
name = ' test '
A = [ ' A ' ]
B = [ ' B ' ]
C = [ ' C ' ]
D = [ ' D ' ]
self . set_aging ( False )
self . dns_update_record ( name , A ) . dwTimeStamp
self . ldap_update_record ( name , B , dwTimeStamp = 0 )
self . dns_update_record ( name , B )
self . dns_update_record ( name , C )
ctime = self . get_unique_txt_record ( name , C ) . dwTimeStamp
self . assertEqual ( ctime , 0 )
btime = self . get_unique_txt_record ( name , B ) . dwTimeStamp
self . assertEqual ( btime , 0 )
self . ldap_replace_records ( name , [ ] )
self . dns_update_record ( name , D )
dtime = self . get_unique_txt_record ( name , D ) . dwTimeStamp
self . assertEqual ( dtime , 0 )
def _test_update_timestamp_weirdness ( self , n_days , aging = True ) :
name = ' test '
A = [ ' A ' ]
B = [ ' B ' ]
self . set_aging ( aging )
current_time = self . dns_update_record ( name , A ) . dwTimeStamp
# rewind timestamp using ldap
self . ldap_modify_timestamps ( name , n_days * - 24 )
n_days_ago = self . get_unique_txt_record ( name , A ) . dwTimeStamp
time_A = self . dns_update_record ( name , A ) . dwTimeStamp
# that dns_update should have reset the timestamp ONLY if
# aging is on and the old timestamp is > noRefresh period (7
# days)
if n_days > 7 and aging :
self . assert_soon_after ( time_A , current_time )
else :
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( time_A , n_days_ago )
2021-04-28 17:40:08 +12:00
# add another record, which should have the current timestamp
time_B = self . dns_update_record ( name , B ) . dwTimeStamp
self . assert_soon_after ( time_B , current_time )
time_A = self . get_unique_txt_record ( name , A ) . dwTimeStamp
if aging and n_days < = 7 :
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( time_A , n_days_ago )
2021-04-28 17:40:08 +12:00
else :
self . assert_soon_after ( time_A , current_time )
# delete B, try again
self . ldap_delete_record ( name , B )
self . ldap_update_record ( name , A , dwTimeStamp = n_days_ago )
time_A = self . dns_update_record ( name , A ) . dwTimeStamp
# here we are re-adding the deleted record
time_B = self . dns_update_record ( name , B ) . dwTimeStamp
self . assert_soon_after ( time_B , current_time )
time_A = self . get_unique_txt_record ( name , A ) . dwTimeStamp
return n_days_ago , time_A , time_B
def test_update_timestamp_weirdness_no_refresh_no_aging ( self ) :
n_days_ago , time_A , time_B = \
self . _test_update_timestamp_weirdness ( 5 , False )
# the timestamp of the SIBLING of the deleted, re-added record
# differs from the sibling of freshly added record.
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( time_A , n_days_ago )
2021-04-28 17:40:08 +12:00
def test_update_timestamp_weirdness_no_refresh_aging ( self ) :
n_days_ago , time_A , time_B = \
self . _test_update_timestamp_weirdness ( 5 , True )
# the timestamp of the SIBLING of the deleted, re-added record
# differs from the sibling of freshly added record.
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( time_A , n_days_ago )
2021-04-28 17:40:08 +12:00
def test_update_timestamp_weirdness_refresh_no_aging ( self ) :
n_days_ago , time_A , time_B = \
self . _test_update_timestamp_weirdness ( 9 , False )
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( time_A , time_B )
2021-04-28 17:40:08 +12:00
def test_update_timestamp_weirdness_refresh_aging ( self ) :
n_days_ago , time_A , time_B = \
self . _test_update_timestamp_weirdness ( 9 , True )
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( time_A , time_B )
2021-04-28 17:40:08 +12:00
def test_aging_refresh ( self ) :
name , txt = ' agingtest ' , [ ' test txt ' ]
2021-06-19 14:52:47 +12:00
no_refresh = 200
refresh = 160
2021-04-28 17:40:08 +12:00
self . set_zone_int_params ( NoRefreshInterval = no_refresh ,
RefreshInterval = refresh ,
Aging = 1 )
before_mod = self . dns_update_record ( name , txt )
start_time = before_mod . dwTimeStamp
# go back 86 hours, which is in the no-refresh time (but
2021-06-19 14:52:47 +12:00
# wouldn't be if we had stuck to the default of 168).
self . ldap_modify_timestamps ( name , - 170 )
2021-04-28 17:40:08 +12:00
rec = self . dns_update_record ( name , txt )
2021-06-10 23:31:09 +00:00
self . assert_timestamps_equal ( rec . dwTimeStamp ,
2021-06-19 14:52:47 +12:00
start_time - 170 )
2021-04-28 17:40:08 +12:00
2021-06-19 14:52:47 +12:00
# back to -202 hours, into the refresh zone
2021-04-28 17:40:08 +12:00
# the update should reset the timestamp to now.
2021-06-19 14:52:47 +12:00
self . ldap_modify_timestamps ( name , - 32 )
2021-04-28 17:40:08 +12:00
rec = self . dns_update_record ( name , txt )
self . assert_soon_after ( rec . dwTimeStamp , start_time )
2021-06-19 14:52:47 +12:00
# back to -362 hours, beyond the end of the refresh period.
2021-04-28 17:40:08 +12:00
# Actually nothing changes at this time -- we can still
# refresh, but the record is liable for scavenging.
2021-06-19 14:52:47 +12:00
self . ldap_modify_timestamps ( name , - 160 )
2021-04-28 17:40:08 +12:00
rec = self . dns_update_record ( name , txt )
self . assert_soon_after ( rec . dwTimeStamp , start_time )
def test_add_no_timestamp ( self ) :
# check zero timestamp is implicit
self . set_aging ( True )
rec = self . ldap_update_record ( ' ldap ' , ' test ' )
self . assertEqual ( rec . dwTimeStamp , 0 )
rec = self . rpc_update_record ( ' rpc ' , ' test ' )
self . assertEqual ( rec . dwTimeStamp , 0 )
def test_add_zero_timestamp ( self ) :
rec = self . ldap_update_record ( ' ldap ' , ' test ' , dwTimeStamp = 0 )
self . assertEqual ( rec . dwTimeStamp , 0 )
rec = self . rpc_update_record ( ' rpc ' , ' test ' , dwTimeStamp = 0 )
self . assertEqual ( rec . dwTimeStamp , 0 )
def test_add_update_timestamp ( self ) :
# LDAP can change timestamp, RPC can't
rec = self . ldap_update_record ( ' ldap ' , ' test ' , dwTimeStamp = 123456 )
self . assertEqual ( rec . dwTimeStamp , 123456 )
rec = self . rpc_update_record ( ' rpc ' , ' test ' , dwTimeStamp = 123456 )
self . assertEqual ( rec . dwTimeStamp , 0 )
# second time is a different code path (add vs update)
rec = self . rpc_update_record ( ' rpc ' , ' test ' , dwTimeStamp = 123456 )
self . assertEqual ( rec . dwTimeStamp , 0 )
# RPC update the one with timestamp, zeroing it.
rec = self . rpc_update_record ( ' ldap ' , ' test ' , dwTimeStamp = 123456 )
self . assertEqual ( rec . dwTimeStamp , 0 )
def test_add_update_ttl ( self ) :
# RPC *can* set dwTtlSeconds.
rec = self . ldap_update_record ( ' ldap ' , ' test ' ,
dwTtlSeconds = 1234 )
self . assertEqual ( rec . dwTtlSeconds , 1234 )
rec = self . rpc_update_record ( ' rpc ' , ' test ' , dwTtlSeconds = 1234 )
self . assertEqual ( rec . dwTtlSeconds , 1234 )
# does update work like add?
rec = self . rpc_update_record ( ' rpc ' , ' test ' , dwTtlSeconds = 4321 )
self . assertEqual ( rec . dwTtlSeconds , 4321 )
rec = self . rpc_update_record ( ' ldap ' , ' test ' , dwTtlSeconds = 5678 )
self . assertEqual ( rec . dwTtlSeconds , 5678 )
def test_add_update_ttl_serial ( self ) :
# when setting dwTtlSeconds, what happens to serial number?
rec = self . ldap_update_record ( ' ldap ' , ' test ' ,
dwTtlSeconds = 1234 ,
dwSerial = 123 )
self . assertEqual ( rec . dwTtlSeconds , 1234 )
self . assertEqual ( rec . dwSerial , 123 )
rec = self . rpc_update_record ( ' rpc ' , ' test ' , dwTtlSeconds = 1234 )
self . assertEqual ( rec . dwTtlSeconds , 1234 )
serial = rec . dwSerial
self . assertLess ( serial , 4 )
rec = self . rpc_update_record ( ' rpc ' , ' test ' , dwTtlSeconds = 4321 )
self . assertEqual ( rec . dwTtlSeconds , 4321 )
self . assertEqual ( rec . dwSerial , serial + 1 )
rec = self . rpc_update_record ( ' ldap ' , ' test ' , dwTtlSeconds = 5678 )
self . assertEqual ( rec . dwTtlSeconds , 5678 )
self . assertEqual ( rec . dwSerial , 124 )
def test_add_update_dwFlags ( self ) :
# dwFlags splits into rank and flags.
# according to [MS-DNSP] 2.3.2.2, flags MUST be zero
rec = self . ldap_update_record ( ' ldap ' , ' test ' , flags = 22222 , rank = 222 )
self . assertEqual ( rec . flags , 22222 )
self . assertEqual ( rec . rank , 222 )
rec = self . rpc_update_record ( ' ldap ' , ' test ' , dwFlags = 3333333 )
# rank != 3333333 & 0xff == 213
self . assertEqual ( rec . rank , 240 ) # RPC fixes rank
self . assertEqual ( rec . flags , 0 )
self . assertRaises ( OverflowError ,
self . ldap_update_record ,
' ldap ' , ' test ' , flags = 777777777 , rank = 777 )
# reset to no default (rank overflows)
rec = self . ldap_update_record ( ' ldap ' , ' test ' , flags = 7777 , rank = 777 )
self . assertEqual ( rec . flags , 7777 )
self . assertEqual ( rec . rank , 9 )
# DNS update zeros flags, sets rank to 240 (RANK_ZONE)
rec = self . dns_update_record ( ' ldap ' , ' test ' , ttl = 999 )
self . assertEqual ( rec . flags , 0 )
self . assertEqual ( rec . rank , 240 )
rec = self . rpc_update_record ( ' ldap ' , ' test ' , dwFlags = 321 )
self . assertEqual ( rec . flags , 0 )
self . assertEqual ( rec . rank , 240 )
# RPC adding a new record: fixed rank, zero flags
rec = self . rpc_update_record ( ' ldap ' , ' test 2 ' , dwFlags = 12345 )
self . assertEqual ( rec . rank , 240 )
self . assertEqual ( rec . flags , 0 )
def test_add_update_dwReserved ( self ) :
# RPC does not change dwReserved.
rec = self . ldap_update_record ( ' ldap ' , ' test ' , dwReserved = 54321 )
self . assertEqual ( rec . dwReserved , 54321 )
rec = self . rpc_update_record ( ' rpc ' , ' test ' , dwReserved = 54321 )
self . assertEqual ( rec . dwReserved , 0 )
rec = self . rpc_update_record ( ' rpc ' , ' test ' , dwReserved = 54321 )
self . assertEqual ( rec . dwReserved , 0 )
rec = self . rpc_update_record ( ' ldap ' , ' test ' , dwReserved = 12345 )
self . assertEqual ( rec . dwReserved , 54321 )
def test_add_update_dwSerial ( self ) :
# On Windows the RPC record ends up with serial 2, on Samba
# serial 3. Rather than knownfail this, we accept anything
# below 4 (for now).
rec = self . ldap_update_record ( ' ldap ' , ' test ' , dwSerial = 123 )
self . assertEqual ( rec . dwSerial , 123 )
rec = self . rpc_update_record ( ' rpc ' , ' test ' , dwSerial = 123 )
self . assertLess ( rec . dwSerial , 4 )
rec = self . rpc_update_record ( ' rpc ' , ' test ' , dwSerial = 123 )
self . assertLess ( rec . dwSerial , 4 )
rec = self . dns_update_record ( ' rpc ' , ' test ' )
self . assertLess ( rec . dwSerial , 4 )
rec = self . dns_update_record ( ' dns-0 ' , ' test ' )
self . assertLess ( rec . dwSerial , 5 )
rec = self . dns_update_record ( ' ldap ' , ' test ' )
self . assertEqual ( rec . dwSerial , 123 )
rec = self . rpc_update_record ( ' ldap ' , ' test ' , dwSerial = 123 )
self . assertEqual ( rec . dwSerial , 123 )
rec = self . ldap_update_record ( ' ldap ' , ' test ' , dwSerial = 12 )
self . assertEqual ( rec . dwSerial , 12 )
# when we dns-updated ldap/test, we alerted Windows to 123 as
# a high water mark for the zone. (even though we have since
# dropped the serial to 12, 123 is the base serial for new
# records).
rec = self . dns_update_record ( ' dns ' , ' test ' )
self . assertEqual ( rec . dwSerial , 124 )
rec = self . dns_update_record ( ' dns2 ' , ' test ' )
self . assertEqual ( rec . dwSerial , 125 )
rec = self . rpc_update_record ( ' rpc2 ' , ' test ' )
self . assertEqual ( rec . dwSerial , 126 )
rec = self . dns_update_record ( ' dns ' , ' test 2 ' )
self . assertEqual ( rec . dwSerial , 127 )
def test_add_update_dwSerial_2 ( self ) :
# On Samba the RPC update resets the serial to a low number,
# while Windows leaves it high.
rec = self . ldap_update_record ( ' ldap ' , ' test ' , dwSerial = 123 )
self . assertEqual ( rec . dwSerial , 123 )
rec = self . rpc_update_record ( ' ldap ' , ' test ' , dwSerial = 321 )
self . assertEqual ( rec . dwSerial , 123 )
rec = self . dns_update_record ( ' ldap ' , ' test ' )
self . assertEqual ( rec . dwSerial , 123 )
2021-06-17 23:10:50 +12:00
def test_rpc_update_disparate_types ( self ) :
""" Can we use update to replace a TXT with an AAAA? """
name = ' x '
old = TXTRecord ( " x " )
new = ARecord ( " 127.0.0.111 " )
self . rpc_replace ( name , None , old )
recs = self . ldap_get_records ( name )
self . assertEqual ( len ( recs ) , 1 )
self . assertEqual ( recs [ 0 ] . wType , old . wType )
self . rpc_replace ( name , old , new )
recs = self . ldap_get_records ( name )
self . assertEqual ( len ( recs ) , 1 )
self . assertEqual ( recs [ 0 ] . wType , new . wType )
2021-04-28 17:40:08 +12:00
def test_add_update_many ( self ) :
# Samba fails often in this set, but we want to see how it
# goes further down, so we print the problems and defer the
# failure.
failures = 0
total = 0
def _defer_wrap ( f ) :
def _defer ( * args ) :
nonlocal failures , total
total + = 1
try :
f ( * args )
except self . failureException as e :
from traceback import format_stack
print ( f " { format_stack ( ) [ - 2 ] } { e } \n " )
failures + = 1
return _defer
defer_assertEqual = _defer_wrap ( self . assertEqual )
defer_assert_timestamp_in_ballpark = \
_defer_wrap ( self . assert_timestamp_in_ballpark )
self . set_aging ( False )
rec = self . ldap_update_record ( ' ldap ' , ' test ' ,
version = 11 ,
rank = 22 ,
flags = 33 ,
dwSerial = 44 ,
dwTtlSeconds = 55 ,
dwReserved = 66 ,
dwTimeStamp = 77 )
self . assertEqual ( rec . version , 5 ) # disobeys request
self . assertEqual ( rec . rank , 22 )
self . assertEqual ( rec . flags , 33 )
self . assertEqual ( rec . dwSerial , 44 )
self . assertEqual ( rec . dwTtlSeconds , 55 )
self . assertEqual ( rec . dwReserved , 66 )
self . assertEqual ( rec . dwTimeStamp , 77 )
# DNS updates first
rec = self . dns_update_record ( ' ldap ' , ' test ' , ttl = 999 )
self . assertEqual ( rec . version , 5 )
self . assertEqual ( rec . rank , 240 ) # rank gets fixed by DNS update
defer_assertEqual ( rec . flags , 0 ) # flags gets fixed
defer_assertEqual ( rec . dwSerial , 45 ) # serial increments
self . assertEqual ( rec . dwTtlSeconds , 999 ) # TTL set
defer_assertEqual ( rec . dwReserved , 0 ) # reserved fixed
defer_assert_timestamp_in_ballpark ( rec ) # changed on Windows ?!
self . set_aging ( True )
rec = self . dns_update_record ( ' ldap ' , ' test ' , ttl = 1111 )
self . assertEqual ( rec . version , 5 )
self . assertEqual ( rec . rank , 240 )
defer_assertEqual ( rec . flags , 0 )
defer_assertEqual ( rec . dwSerial , 46 )
self . assertEqual ( rec . dwTtlSeconds , 1111 ) # TTL set
defer_assertEqual ( rec . dwReserved , 0 )
self . assert_timestamp_in_ballpark ( rec )
# RPC update
rec = self . rpc_update_record ( ' ldap ' , ' test ' ,
version = 111 ,
dwFlags = 333 ,
dwSerial = 444 ,
dwTtlSeconds = 555 ,
dwReserved = 666 ,
dwTimeStamp = 777 )
self . assertEqual ( rec . version , 5 ) # no change
self . assertEqual ( rec . rank , 240 ) # no change
defer_assertEqual ( rec . flags , 0 ) # no change
defer_assertEqual ( rec . dwSerial , 47 ) # Serial increments
self . assertEqual ( rec . dwTtlSeconds , 555 ) # TTL set
defer_assertEqual ( rec . dwReserved , 0 ) # no change
self . assertEqual ( rec . dwTimeStamp , 0 ) # timestamp zeroed
# RPC update, using default values
rec = self . rpc_update_record ( ' ldap ' , ' test ' )
self . assertEqual ( rec . version , 5 )
self . assertEqual ( rec . rank , 240 )
defer_assertEqual ( rec . flags , 0 )
defer_assertEqual ( rec . dwSerial , 48 ) # serial increments
self . assertEqual ( rec . dwTtlSeconds , 900 ) # TTL changed
defer_assertEqual ( rec . dwReserved , 0 )
self . assertEqual ( rec . dwTimeStamp , 0 )
self . set_aging ( False )
rec = self . dns_update_record ( ' ldap ' , ' test ' , ttl = 888 )
self . assertEqual ( rec . version , 5 )
self . assertEqual ( rec . rank , 240 )
defer_assertEqual ( rec . flags , 0 )
defer_assertEqual ( rec . dwSerial , 49 ) # serial increments
self . assertEqual ( rec . dwTtlSeconds , 888 ) # TTL set
defer_assertEqual ( rec . dwReserved , 0 )
self . assertEqual ( rec . dwTimeStamp , 0 ) # timestamp stays zero
if failures :
2023-06-06 13:28:13 +02:00
self . fail ( f " failed { failures } / { total } deferred assertions " )
2021-04-28 17:40:08 +12:00
def test_static_record_dynamic_update ( self ) :
""" Add a static record, then a dynamic record.
The dynamic record should have a timestamp set . """
name = ' test '
txt = [ ' static txt ' ]
txt2 = [ ' dynamic txt ' ]
self . set_aging ( True )
rec = self . ldap_update_record ( name , txt , dwTimeStamp = 0 )
rec2 = self . dns_update_record ( name , txt2 )
self . assert_timestamp_in_ballpark ( rec2 )
ts2 = rec2 . dwTimeStamp
# update the first record. It should stay static (timestamp 0)
rec = self . dns_update_record ( name , txt )
self . assertEqual ( rec . dwTimeStamp , 0 )
# and rec2 should be unchanged.
self . assertEqual ( rec2 . dwTimeStamp , ts2 )
def test_dynamic_record_static_update ( self ) :
name = ' agingtest '
txt1 = [ ' dns update before ' ]
txt2 = [ ' ldap update ' ]
txt3 = [ ' dns update after ' ]
self . set_aging ( True )
self . dns_update_record ( name , txt1 )
self . ldap_update_record ( name , txt2 )
self . dns_update_record ( name , txt3 )
recs = self . get_rpc_records ( name )
for r in recs :
d = [ x . str for x in r . data . str ]
if d == txt1 :
self . assertNotEqual ( r . dwTimeStamp , 0 )
elif d == txt2 :
self . assertEqual ( r . dwTimeStamp , 0 )
elif d == txt3 :
self . assertNotEqual ( r . dwTimeStamp , 0 )
2021-06-16 22:48:56 +00:00
def test_tombstone_in_hours_and_nttime ( self ) :
# Until now Samba has measured tombstone timestamps in hours,
# not ten-millionths of a second. After now, we want Samba to
# handle both.
nh , oh , nn , on , on0 , onf , nn0 , nnf , _1601 = ' abcdefgij '
now_hours = dsdb_dns . unix_to_dns_timestamp ( int ( time . time ( ) ) )
old_hours = now_hours - 24 * 90
now_nttime = dsdb_dns . dns_timestamp_to_nt_time ( now_hours )
old_nttime = dsdb_dns . dns_timestamp_to_nt_time ( old_hours )
# calculations on hours might be based on the lower 32 bits,
# so we test with these forced to extremes (the maximum change
# is 429 seconds in NTTIME).
old_nttime0 = old_nttime & 0xffffffff00000000
old_nttimef = old_nttime | 0xffffffff
now_nttime0 = now_nttime & 0xffffffff00000000
now_nttimef = now_nttime | 0xffffffff
self . dns_tombstone ( nh , epoch_nttime = now_hours )
self . dns_tombstone ( oh , epoch_nttime = old_hours )
self . dns_tombstone ( nn , epoch_nttime = now_nttime )
self . dns_tombstone ( on , epoch_nttime = old_nttime )
self . dns_tombstone ( nn0 , epoch_nttime = now_nttime0 )
self . dns_tombstone ( nnf , epoch_nttime = now_nttimef )
self . dns_tombstone ( on0 , epoch_nttime = old_nttime0 )
self . dns_tombstone ( onf , epoch_nttime = old_nttimef )
# this is our (arbitrary) threshold that will make us think in
# NTTIME, not hours.
self . dns_tombstone ( _1601 , epoch_nttime = ( 10 * 1000 * 1000 + 1 ) )
try :
file_samdb = get_file_samdb ( )
except ldb . LdbError as e :
raise AssertionError (
f " failing because ' { e } ' : this is Windows? " ) from None
dsdb . _dns_delete_tombstones ( file_samdb )
# nh and nn should not be deleted
for name in nh , nn , nn0 , nnf :
recs = self . ldap_get_records ( name )
self . assertEqual ( len ( recs ) , 1 )
self . assert_tombstoned ( name , timestamp = False )
# oh and on should be GONE
for name in oh , on , on0 , onf , _1601 :
recs = self . ldap_get_records ( name )
self . assertEqual ( len ( recs ) , 0 )
2021-06-14 23:41:07 +12:00
def test_dns_query_for_tombstoned_results ( self ) :
# This one fails on Windows, because the dns cache holds B
# after it has been tombstoned behind its back.
A = ' a '
B = ' b '
self . dns_tombstone ( A )
self . assert_tombstoned ( A )
r = self . dns_query ( A , qtype = dns . DNS_QTYPE_TXT )
self . assertEqual ( r . ancount , 0 )
self . dns_update_record ( B , B )
self . dns_tombstone ( B )
self . assert_tombstoned ( B )
r = self . dns_query ( B , qtype = dns . DNS_QTYPE_TXT )
self . assertEqual ( r . ancount , 0 )
2021-04-28 17:40:08 +12:00
def test_basic_scavenging ( self ) :
# NOTE: This one fails on Windows, because the RPC call to
# prompt scavenging is not immediate. On Samba, in the
# testenv, we don't have the RPC call but we can connect to
# the database directly.
# just to be sure we have the right limits.
2021-06-19 14:52:47 +12:00
self . set_zone_int_params ( NoRefreshInterval = 168 ,
RefreshInterval = 168 ,
2021-04-28 17:40:08 +12:00
Aging = 1 )
ts1 , ts2 , ts3 , ts4 , ts5 , ts6 = ( ' 1 ' , ' 2 ' , ' 3 ' , ' 4 ' , ' 5 ' , ' 6 ' )
self . dns_update_record ( ts1 , ts1 )
self . dns_update_record ( ts2 , ts2 )
# ts2 is tombstoned and timestamped in 1981
self . dns_tombstone ( ts2 )
# ts3 is tombstoned and timestamped in the future
self . dns_tombstone ( ts3 , epoch_hours = ( DNS_TIMESTAMP_2101 - 1 ) )
# ts4 is tombstoned and timestamped in the past
self . dns_tombstone ( ts4 , epoch_hours = 1111111 )
# ts5 is tombstoned in the past and timestamped in the future
self . dns_tombstone ( ts5 , epoch_hours = 5555555 , epoch_nttime = int ( 1e10 ) )
# ts2 and ts3 should now be tombstoned.
self . assert_tombstoned ( ts2 )
self . assert_tombstoned ( ts3 )
# let's un-tombstone ts2
# ending up with dnsTombstoned: FALSE in Samba
# and no dNSTombstoned in Windows.
self . dns_update_record ( ts2 , " ts2 untombstoned " )
ts2_node = self . get_one_node ( ts2 )
ts2_tombstone = ts2_node . get ( " dNSTombstoned " )
if ts2_tombstone is not None :
self . assertEqual ( ts2_tombstone [ 0 ] , b " FALSE " )
self . assert_tombstoned ( ts2 , tombstoned = False )
r = self . dns_update_record ( ts6 , ts6 )
# put some records into the death zone.
self . ldap_modify_timestamps ( ts1 , - 15 * 24 )
self . ldap_modify_timestamps ( ts2 , - 14 * 24 - 2 )
self . ldap_modify_timestamps ( ts6 , - 14 * 24 + 2 )
# ts1 will be saved by this record
self . dns_update_record ( ts1 , " another record " )
try :
# Tell the server to clean-up records.
# This is how it *should* work on Windows:
self . rpc_conn . DnssrvOperation2 (
dnsserver . DNS_CLIENT_VERSION_LONGHORN ,
0 ,
SERVER_IP ,
None ,
0 ,
" StartScavenging " ,
dnsserver . DNSSRV_TYPEID_NULL ,
None )
# Samba won't get here (NOT_IMPLEMENTED error)
# wait for Windows to do its cleanup.
time . sleep ( 2 )
except WERRORError as e :
if e . args [ 0 ] == werror . WERR_CALL_NOT_IMPLEMENTED :
# This is the Samba way, talking to the file directly,
# as if we were the server process. The direct
# connection is needed because the tombstoning search
# involves a magic system only filter.
file_samdb = get_file_samdb ( )
dsdb . _scavenge_dns_records ( file_samdb )
dsdb . _dns_delete_tombstones ( file_samdb )
else :
raise
# Now what we should have:
# ts1: alive: the old record is deleted, the new one not.
# ts2: tombstoned
# ts3: tombstoned
# ts4: deleted. gone.
# ts5: deleted. timestamp affects tombstoning, but not deletion.
# ts6: alive
#
# We order our assertions to make the windows test
# fail as late as possible (on ts4, ts5, ts2).
r = self . get_unique_txt_record ( ts1 , [ " another record " ] )
self . assertIsNotNone ( r )
r = self . get_unique_txt_record ( ts6 , [ ts6 ] )
self . assertIsNotNone ( r )
self . assert_tombstoned ( ts3 )
n = self . get_one_node ( ts4 )
self . assertIsNone ( n )
n = self . get_one_node ( ts5 )
self . assertIsNone ( n )
self . assert_tombstoned ( ts2 )
2021-06-14 23:41:14 +12:00
def test_samba_scavenging ( self ) :
# We expect this one to fail on Windows, because scavenging
# and tombstoning cannot be performed on demand.
try :
file_samdb = get_file_samdb ( )
except ldb . LdbError as e :
raise AssertionError (
f " failing because ' { e } ' : this is Windows? " ) from None
# let's try different limits.
self . set_zone_int_params ( NoRefreshInterval = 30 ,
RefreshInterval = 20 ,
Aging = 1 )
now = dsdb_dns . unix_to_dns_timestamp ( int ( time . time ( ) ) )
A , B , C , D = ' ABCD '
# A has current time
# B has safe, non-updateable time
# C has safe time
# D is scavengeable
atime = self . dns_update_record ( A , A ) . dwTimeStamp
btime = self . ldap_update_record ( B , B , dwTimeStamp = now - 20 ) . dwTimeStamp
2021-11-19 16:12:43 +13:00
ctime = self . ldap_update_record ( C , C , dwTimeStamp = now - 40 ) . dwTimeStamp
2021-06-14 23:41:14 +12:00
dtime = self . ldap_update_record ( D , D , dwTimeStamp = now - 60 ) . dwTimeStamp
self . assert_soon_after ( atime , now )
self . assert_timestamps_equal ( btime , now - 20 )
self . assert_timestamps_equal ( ctime , now - 40 )
self . assert_timestamps_equal ( dtime , now - 60 )
dsdb . _scavenge_dns_records ( file_samdb )
# D should be gone (tombstoned)
r = self . get_unique_txt_record ( D , D )
self . assertIsNone ( r )
2021-11-19 16:12:43 +13:00
r = self . dns_query ( D , qtype = dns . DNS_QTYPE_TXT )
2021-06-14 23:41:14 +12:00
self . assertEqual ( r . ancount , 0 )
recs = self . ldap_get_records ( D )
self . assertEqual ( len ( recs ) , 1 )
2023-08-29 17:31:14 +12:00
self . assert_tombstoned ( D )
2021-06-14 23:41:14 +12:00
# others unchanged.
atime = self . get_unique_txt_record ( A , A ) . dwTimeStamp
btime = self . get_unique_txt_record ( B , B ) . dwTimeStamp
ctime = self . get_unique_txt_record ( C , C ) . dwTimeStamp
self . assert_soon_after ( atime , now )
self . assert_timestamps_equal ( btime , now - 20 )
self . assert_timestamps_equal ( ctime , now - 40 )
btime = self . dns_update_record ( B , B ) . dwTimeStamp
ctime = self . dns_update_record ( C , C ) . dwTimeStamp
self . assert_timestamps_equal ( btime , now - 40 )
self . assert_soon_after ( ctime , now )
# after this, D *should* still be a tombstone, because its
# tombstone timestamp is not very old.
dsdb . _dns_delete_tombstones ( file_samdb )
recs = self . ldap_get_records ( D )
self . assertEqual ( len ( recs ) , 1 )
2023-08-29 17:31:14 +12:00
self . assert_tombstoned ( D )
2021-06-14 23:41:14 +12:00
# Let's delete C using rpc, and ensure it survives dns_delete_tombstones
self . rpc_delete_txt ( C , C )
recs = self . ldap_get_records ( C )
self . assertEqual ( len ( recs ) , 1 )
2023-08-29 17:31:14 +12:00
self . assert_tombstoned ( C )
2021-06-14 23:41:14 +12:00
dsdb . _dns_delete_tombstones ( file_samdb )
recs = self . ldap_get_records ( C )
self . assertEqual ( len ( recs ) , 1 )
2023-08-29 17:31:14 +12:00
self . assert_tombstoned ( C )
2021-06-14 23:41:14 +12:00
# now let's wind A and B back to either side of the two week
# threshold. A should survive, B should not.
self . dns_tombstone ( A , ( now - 166 ) )
self . dns_tombstone ( B , ( now - 170 ) )
dsdb . _dns_delete_tombstones ( file_samdb )
recs = self . ldap_get_records ( A )
self . assertEqual ( len ( recs ) , 1 )
2023-08-29 17:31:14 +12:00
self . assert_tombstoned ( A )
2021-06-14 23:41:14 +12:00
recs = self . ldap_get_records ( B )
self . assertEqual ( len ( recs ) , 0 )
2021-06-10 23:30:19 +00:00
def _test_A_and_AAAA_records ( self , A , B , a_days , b_days , aging ) :
self . set_aging ( aging )
name = ' aargh '
now = dsdb_dns . unix_to_dns_timestamp ( int ( time . time ( ) ) )
a_initial = now - 24 * a_days
b_initial = now - 24 * b_days
self . dns_update_non_text ( name , A )
self . ldap_modify_timestamps ( name , a_days * - 24 )
rec_a = self . get_unique_ip_record ( name , A )
rec_b = self . add_ip_record ( name , B , dwTimeStamp = b_initial )
self . assert_timestamps_equal ( rec_a , a_initial )
self . assert_timestamps_equal ( rec_b , b_initial )
# touch the A record.
self . dns_update_non_text ( name , A )
# check the A timestamp, depending on norefresh
rec_a = self . get_unique_ip_record ( name , A )
if aging and a_days > 7 :
time_a = now
self . assert_soon_after ( rec_a , now )
elif a_days > 7 :
# when we have NO aging and are in the refresh window, the
# timestamp now reads as a_initial, but will become now
# after we manipulate B for a bit.
time_a = now
self . assert_timestamps_equal ( rec_a , a_initial )
else :
time_a = a_initial
self . assert_timestamps_equal ( rec_a , a_initial )
# B timestamp should be unchanged?
rec_b = self . get_unique_ip_record ( name , B )
self . assert_timestamps_equal ( rec_b , b_initial )
# touch the B record.
self . dns_update_non_text ( name , B )
# check the B timestamp
rec_b = self . get_unique_ip_record ( name , B )
2021-06-20 10:18:46 +12:00
if not aging :
self . windows_variation (
self . assert_soon_after , rec_b , now ,
msg = " windows updates non-aging, samba does not " )
else :
self . assert_soon_after ( rec_b , now )
2021-06-10 23:30:19 +00:00
# rewind B
rec_b = self . add_ip_record ( name , B , dwTimeStamp = b_initial )
# NOW rec A might have changed! with no aging, and out of refresh.
rec_a = self . get_unique_ip_record ( name , A )
self . assert_timestamps_equal ( rec_a , time_a )
self . dns_update_non_text ( name , A )
rec_a = self . get_unique_ip_record ( name , B )
self . assert_timestamps_equal ( rec_b , b_initial )
# now delete A
_ , wtype = guess_wtype ( A )
self . ldap_delete_record ( name , A , wtype = wtype )
# re-add it
self . dns_update_non_text ( name , A )
rec_a = self . get_unique_ip_record ( name , A )
self . assert_soon_after ( rec_a , now )
rec_b = self . get_unique_ip_record ( name , B )
self . assert_timestamps_equal ( rec_b , b_initial )
def test_A_5_days_AAAA_5_days_aging ( self ) :
self . _test_A_and_AAAA_records ( IPv4_ADDR , IPv6_ADDR , 5 , 5 , aging = True )
def test_A_5_days_AAAA_5_days_no_aging ( self ) :
self . _test_A_and_AAAA_records ( IPv4_ADDR , IPv6_ADDR , 5 , 5 , aging = False )
def test_A_5_days_AAAA_10_days_aging ( self ) :
self . _test_A_and_AAAA_records ( IPv4_ADDR , IPv6_ADDR , 5 , 10 , aging = True )
def test_A_5_days_AAAA_10_days_no_aging ( self ) :
self . _test_A_and_AAAA_records ( IPv4_ADDR , IPv6_ADDR , 5 , 10 , aging = False )
def test_A_10_days_AAAA_5_days_aging ( self ) :
self . _test_A_and_AAAA_records ( IPv4_ADDR , IPv6_ADDR , 10 , 5 , aging = True )
def test_A_10_days_AAAA_5_days_no_aging ( self ) :
self . _test_A_and_AAAA_records ( IPv4_ADDR , IPv6_ADDR , 10 , 5 , aging = False )
def test_A_10_days_AAAA_9_days_aging ( self ) :
self . _test_A_and_AAAA_records ( IPv4_ADDR , IPv6_ADDR , 10 , 9 , aging = True )
def test_A_9_days_AAAA_10_days_no_aging ( self ) :
self . _test_A_and_AAAA_records ( IPv4_ADDR , IPv6_ADDR , 9 , 10 , aging = False )
def test_A_20_days_AAAA_2_days_aging ( self ) :
self . _test_A_and_AAAA_records ( IPv4_ADDR , IPv6_ADDR , 20 , 2 , aging = True )
def test_A_6_days_AAAA_40_days_no_aging ( self ) :
self . _test_A_and_AAAA_records ( IPv4_ADDR , IPv6_ADDR , 6 , 40 , aging = False )
def test_A_5_days_A_5_days_aging ( self ) :
self . _test_A_and_AAAA_records ( IPv4_ADDR , IPv4_ADDR_2 , 5 , 5 , aging = True )
def test_A_5_days_A_10_days_no_aging ( self ) :
self . _test_A_and_AAAA_records ( IPv4_ADDR , IPv4_ADDR_2 , 5 , 10 , aging = False )
def test_AAAA_5_days_AAAA_6_days_aging ( self ) :
self . _test_A_and_AAAA_records ( IPv6_ADDR , IPv6_ADDR_2 , 5 , 6 , aging = True )
def test_AAAA_5_days_AAAA_6_days_no_aging ( self ) :
self . _test_A_and_AAAA_records ( IPv6_ADDR , IPv6_ADDR_2 , 5 , 6 , aging = False )
2021-06-18 18:32:22 +12:00
def _test_multi_records_delete ( self , aging ) :
# Batch deleting a type doesn't update other types timestamps.
self . set_aging ( aging )
name = ' aargh '
now = dsdb_dns . unix_to_dns_timestamp ( int ( time . time ( ) ) )
back_5_days = now - 5 * 24
back_10_days = now - 10 * 24
back_25_days = now - 25 * 24
ip4s = {
' 1.1.1.1 ' : now ,
' 2.2.2.2 ' : back_5_days ,
' 3.3.3.3 ' : back_10_days ,
}
ip6s = {
' ::1 ' : now ,
' ::2 ' : back_5_days ,
' ::3 ' : back_25_days ,
}
txts = {
' 1 ' : now ,
' 2 ' : back_5_days ,
' 3 ' : back_25_days ,
}
# For windows, if we don't DNS update something, it won't know
# there's anything.
self . dns_update_record ( name , ' 3 ' )
for k , v in ip4s . items ( ) :
r = self . add_ip_record ( name , k , wtype = dns . DNS_QTYPE_A , dwTimeStamp = v )
for k , v in ip6s . items ( ) :
r = self . add_ip_record ( name , k , wtype = dns . DNS_QTYPE_AAAA , dwTimeStamp = v )
for k , v in txts . items ( ) :
r = self . ldap_update_record ( name , k , dwTimeStamp = v )
self . dns_delete_type ( name , dnsp . DNS_TYPE_A )
r = self . dns_query ( name , dns . DNS_QTYPE_A )
self . assertEqual ( r . ancount , 0 )
r = self . dns_query ( name , dns . DNS_QTYPE_TXT )
self . assertEqual ( r . ancount , 3 )
rset = set ( x . rdata . txt . str [ 0 ] for x in r . answers )
self . assertEqual ( rset , set ( txts ) )
r = self . dns_query ( name , dns . DNS_QTYPE_AAAA )
self . assertEqual ( r . ancount , 3 )
rset = set ( ipv6_normalise ( x . rdata ) for x in r . answers )
self . assertEqual ( rset , set ( ip6s ) )
recs = self . ldap_get_records ( name )
self . assertEqual ( len ( recs ) , 6 )
for r in recs :
if r . wType == dns . DNS_QTYPE_AAAA :
k = ipv6_normalise ( r . data )
expected = ip6s [ k ]
elif r . wType == dns . DNS_QTYPE_TXT :
k = r . data . str [ 0 ]
expected = txts [ k ]
else :
self . fail ( f " unexpected wType { r . wType } " )
self . assert_timestamps_equal ( r . dwTimeStamp , expected )
def test_multi_records_delete_aging ( self ) :
self . _test_multi_records_delete ( True )
def test_multi_records_delete_no_aging ( self ) :
self . _test_multi_records_delete ( False )
2021-06-14 23:39:53 +12:00
def _test_dns_delete_times ( self , n_days , aging = True ) :
# In these tests, Windows replaces the records with
# tombstones, while Samba just removes them. Both are
# reasonable approaches (there is no reanimation pathway for
# tombstones), but this means self.ldap_get_records() gets
# different numbers for each. So we use
# self.ldap_get_non_tombstoned_record().
name = ' test '
A = [ ' A ' ]
B = [ ' B ' ]
C = [ ' C ' ]
D = [ ' D ' ]
self . set_aging ( aging )
now = dsdb_dns . unix_to_dns_timestamp ( int ( time . time ( ) ) )
n_days_ago = max ( now - n_days * 24 , 0 )
self . dns_update_record ( name , A )
self . ldap_update_record ( name , A , dwTimeStamp = n_days_ago )
self . ldap_update_record ( name , B , dwTimeStamp = n_days_ago )
self . ldap_update_record ( name , C , dwTimeStamp = n_days_ago )
self . dns_update_record ( name , D )
r = self . dns_query ( name , dns . DNS_QTYPE_TXT )
rset = set ( x . rdata . txt . str [ 0 ] for x in r . answers )
self . assertEqual ( rset , set ( ' ABCD ' ) )
atime = self . get_unique_txt_record ( name , A ) . dwTimeStamp
btime = self . get_unique_txt_record ( name , B ) . dwTimeStamp
ctime = self . get_unique_txt_record ( name , C ) . dwTimeStamp
dtime = self . get_unique_txt_record ( name , D ) . dwTimeStamp
recs = self . ldap_get_records ( name )
self . assertEqual ( len ( recs ) , 4 )
r = self . dns_query ( name , dns . DNS_QTYPE_TXT )
rset = set ( x . rdata . txt . str [ 0 ] for x in r . answers )
self . assertEqual ( rset , set ( ' ABCD ' ) )
2023-08-28 13:39:03 +12:00
self . assert_timestamps_equal ( dtime , self . get_unique_txt_record ( name , D ) )
2021-06-14 23:39:53 +12:00
self . dns_delete ( name , D )
self . assert_timestamps_equal ( atime , self . get_unique_txt_record ( name , A ) )
self . assert_timestamps_equal ( btime , self . get_unique_txt_record ( name , B ) )
self . assert_timestamps_equal ( ctime , self . get_unique_txt_record ( name , C ) )
recs = self . ldap_get_non_tombstoned_records ( name )
self . assertEqual ( len ( recs ) , 3 )
r = self . dns_query ( name , dns . DNS_QTYPE_TXT )
rset = set ( x . rdata . txt . str [ 0 ] for x in r . answers )
self . assertEqual ( rset , set ( ' ABC ' ) )
self . rpc_delete_txt ( name , C )
self . assert_timestamps_equal ( atime , self . get_unique_txt_record ( name , A ) )
self . assert_timestamps_equal ( btime , self . get_unique_txt_record ( name , B ) )
recs = self . ldap_get_non_tombstoned_records ( name )
self . assertEqual ( len ( recs ) , 2 )
r = self . dns_query ( name , dns . DNS_QTYPE_TXT )
rset = set ( x . rdata . txt . str [ 0 ] for x in r . answers )
self . assertEqual ( rset , set ( ' AB ' ) )
self . dns_delete ( name , A )
self . assert_timestamps_equal ( btime , self . get_unique_txt_record ( name , B ) )
recs = self . ldap_get_records ( name )
self . assertEqual ( len ( recs ) , 1 )
r = self . dns_query ( name , dns . DNS_QTYPE_TXT )
rset = set ( x . rdata . txt . str [ 0 ] for x in r . answers )
self . assertEqual ( rset , { ' B ' } )
self . dns_delete ( name , B )
recs = self . ldap_get_non_tombstoned_records ( name )
# Windows leaves the node with zero records. Samba ends up
# with a tombstone.
self . assertEqual ( len ( recs ) , 0 )
r = self . dns_query ( name , dns . DNS_QTYPE_TXT )
rset = set ( x . rdata . txt . str [ 0 ] for x in r . answers )
self . assertEqual ( len ( rset ) , 0 )
def test_dns_delete_times_5_days_aging ( self ) :
self . _test_dns_delete_times ( 5 , True )
def test_dns_delete_times_11_days_aging ( self ) :
self . _test_dns_delete_times ( 11 , True )
def test_dns_delete_times_366_days_aging ( self ) :
self . _test_dns_delete_times ( 366 , True )
def test_dns_delete_times_static_aging ( self ) :
self . _test_dns_delete_times ( 1e10 , True )
def test_dns_delete_times_5_days_no_aging ( self ) :
self . _test_dns_delete_times ( 5 , False )
def test_dns_delete_times_11_days_no_aging ( self ) :
self . _test_dns_delete_times ( 11 , False )
def test_dns_delete_times_366_days_no_aging ( self ) :
self . _test_dns_delete_times ( 366 , False )
def test_dns_delete_times_static_no_aging ( self ) :
self . _test_dns_delete_times ( 1e10 , False )
2021-06-19 13:35:54 +12:00
def _test_dns_delete_simple ( self , a_days , b_days , aging = True , touch = False ) :
# Here we show that with aging enabled, the timestamp of
# sibling records is *not* modified when a record is deleted.
#
# With aging disabled, it *is* modified, if the dns server has
# seen it updated before ldap set the time (that is, probably
# the dns server overwrites AD). This happens even if AD
# thinks the record is static.
name = ' test '
A = [ ' A ' ]
B = [ ' B ' ]
self . set_aging ( aging )
now = dsdb_dns . unix_to_dns_timestamp ( int ( time . time ( ) ) )
a_days_ago = max ( now - a_days * 24 , 0 )
b_days_ago = max ( now - b_days * 24 , 0 )
if touch :
self . dns_update_record ( name , A )
self . dns_update_record ( name , B )
self . ldap_update_record ( name , A , dwTimeStamp = a_days_ago )
self . ldap_update_record ( name , B , dwTimeStamp = b_days_ago )
atime = self . get_unique_txt_record ( name , A ) . dwTimeStamp
self . dns_delete ( name , B )
if not aging and touch :
# this resets the timestamp even if it is a static record.
self . assert_soon_after ( self . get_unique_txt_record ( name , A ) , now )
else :
self . assert_timestamps_equal ( self . get_unique_txt_record ( name , A ) , atime )
def test_dns_delete_simple_2_3_days_aging ( self ) :
self . _test_dns_delete_simple ( 2 , 3 , True )
def test_dns_delete_simple_2_3_days_no_aging ( self ) :
self . _test_dns_delete_simple ( 2 , 3 , False )
def test_dns_delete_simple_2_13_days_aging ( self ) :
self . _test_dns_delete_simple ( 2 , 13 , True )
def test_dns_delete_simple_2_13_days_no_aging ( self ) :
self . _test_dns_delete_simple ( 2 , 13 , False )
def test_dns_delete_simple_12_13_days_aging ( self ) :
self . _test_dns_delete_simple ( 12 , 13 , True )
def test_dns_delete_simple_12_13_days_no_aging ( self ) :
self . _test_dns_delete_simple ( 12 , 13 , False )
def test_dns_delete_simple_112_113_days_aging ( self ) :
self . _test_dns_delete_simple ( 112 , 113 , True )
def test_dns_delete_simple_112_113_days_no_aging ( self ) :
self . _test_dns_delete_simple ( 112 , 113 , False )
def test_dns_delete_simple_0_113_days_aging ( self ) :
# 1e9 hours ago evaluates to 0, i.e static
self . _test_dns_delete_simple ( 1e9 , 113 , True )
def test_dns_delete_simple_0_113_days_no_aging ( self ) :
self . _test_dns_delete_simple ( 1e9 , 113 , False )
def test_dns_delete_simple_0_0_days_aging ( self ) :
self . _test_dns_delete_simple ( 1e9 , 1e9 , True )
def test_dns_delete_simple_0_0_days_no_aging ( self ) :
self . _test_dns_delete_simple ( 1e9 , 1e9 , False )
def test_dns_delete_simple_10_0_days_aging ( self ) :
self . _test_dns_delete_simple ( 10 , 1e9 , True )
def test_dns_delete_simple_10_0_days_no_aging ( self ) :
self . _test_dns_delete_simple ( 10 , 1e9 , False )
def test_dns_delete_simple_2_3_days_aging_touch ( self ) :
self . _test_dns_delete_simple ( 2 , 3 , True , True )
def test_dns_delete_simple_2_3_days_no_aging_touch ( self ) :
self . _test_dns_delete_simple ( 2 , 3 , False , True )
def test_dns_delete_simple_2_13_days_aging_touch ( self ) :
self . _test_dns_delete_simple ( 2 , 13 , True , True )
def test_dns_delete_simple_2_13_days_no_aging_touch ( self ) :
self . _test_dns_delete_simple ( 2 , 13 , False , True )
def test_dns_delete_simple_12_13_days_aging_touch ( self ) :
self . _test_dns_delete_simple ( 12 , 13 , True , True )
def test_dns_delete_simple_12_13_days_no_aging_touch ( self ) :
self . _test_dns_delete_simple ( 12 , 13 , False , True )
def test_dns_delete_simple_112_113_days_aging_touch ( self ) :
self . _test_dns_delete_simple ( 112 , 113 , True , True )
def test_dns_delete_simple_112_113_days_no_aging_touch ( self ) :
self . _test_dns_delete_simple ( 112 , 113 , False , True )
def test_dns_delete_simple_0_113_days_aging_touch ( self ) :
# 1e9 hours ago evaluates to 0, i.e static
self . _test_dns_delete_simple ( 1e9 , 113 , True , True )
def test_dns_delete_simple_0_113_days_no_aging_touch ( self ) :
self . _test_dns_delete_simple ( 1e9 , 113 , False , True )
def test_dns_delete_simple_0_0_days_aging_touch ( self ) :
self . _test_dns_delete_simple ( 1e9 , 1e9 , True , True )
def test_dns_delete_simple_0_0_days_no_aging_touch ( self ) :
self . _test_dns_delete_simple ( 1e9 , 1e9 , False , True )
def test_dns_delete_simple_10_0_days_aging_touch ( self ) :
self . _test_dns_delete_simple ( 10 , 1e9 , True , True )
def test_dns_delete_simple_10_0_days_no_aging_touch ( self ) :
self . _test_dns_delete_simple ( 10 , 1e9 , False , True )
2021-06-19 15:43:29 +12:00
def windows_variation ( self , fn , * args , msg = None , * * kwargs ) :
try :
fn ( * args , * * kwargs )
except AssertionError as e :
print ( " Expected success on Windows only, failed as expected: \n " +
c_GREEN ( e ) )
return
print ( c_RED ( " known Windows failure " ) )
if msg is not None :
print ( c_DARK_YELLOW ( msg ) )
print ( " Expected success on Windows: \n " +
c_GREEN ( f " { fn . __name__ } { args } { kwargs } " ) )
2021-06-19 13:35:54 +12:00
2021-06-19 14:18:05 +12:00
def _test_dns_add_sibling ( self , a_days , refresh , aging = True , touch = False ) :
# Here we show that with aging enabled, the timestamp of
# sibling records *is* modified when a record is added.
#
# With aging disabled, it *is* modified, if the dns server has
# seen it updated before ldap set the time (that is, probably
# the dns server overwrites AD). This happens even if AD
# thinks the record is static.
name = ' test '
A = [ ' A ' ]
B = [ ' B ' ]
self . set_zone_int_params ( RefreshInterval = int ( refresh ) ,
NoRefreshInterval = 7 ,
Aging = int ( aging ) )
now = dsdb_dns . unix_to_dns_timestamp ( int ( time . time ( ) ) )
a_days_ago = max ( now - a_days * 24 , 0 )
if touch :
self . dns_update_record ( name , A )
self . ldap_update_record ( name , A , dwTimeStamp = a_days_ago )
atime = self . get_unique_txt_record ( name , A ) . dwTimeStamp
self . dns_update_record ( name , B )
a_rec = self . get_unique_txt_record ( name , A )
if not aging and touch :
2021-06-19 15:54:11 +12:00
# On Windows, this resets the timestamp even if it is a
# static record, though in that case it may be a
# transitory effect of the DNS cache. We will insist on
# the Samba behaviour of not changing (that is
# un-static-ing) a zero timestamp, because that is the
# sensible thing.
if a_days_ago == 0 :
self . windows_variation (
self . assert_soon_after , a_rec , now ,
msg = " Windows resets static siblings (cache effect?) " )
self . assert_timestamps_equal ( a_rec , 0 )
else :
self . assert_soon_after ( a_rec , now )
2021-06-19 14:18:05 +12:00
else :
self . assert_timestamps_equal ( a_rec , atime )
b_rec = self . get_unique_txt_record ( name , B )
self . assert_soon_after ( b_rec , now )
def test_dns_add_sibling_2_7_days_aging ( self ) :
self . _test_dns_add_sibling ( 2 , 7 , True )
def test_dns_add_sibling_2_7_days_no_aging ( self ) :
self . _test_dns_add_sibling ( 2 , 7 , False )
def test_dns_add_sibling_12_7_days_aging ( self ) :
self . _test_dns_add_sibling ( 12 , 7 , True )
def test_dns_add_sibling_12_7_days_no_aging ( self ) :
self . _test_dns_add_sibling ( 12 , 7 , False )
def test_dns_add_sibling_12_3_days_aging ( self ) :
self . _test_dns_add_sibling ( 12 , 3 , True )
def test_dns_add_sibling_12_3_days_no_aging ( self ) :
self . _test_dns_add_sibling ( 12 , 3 , False )
def test_dns_add_sibling_112_7_days_aging ( self ) :
self . _test_dns_add_sibling ( 112 , 7 , True )
def test_dns_add_sibling_112_7_days_no_aging ( self ) :
self . _test_dns_add_sibling ( 112 , 7 , False )
def test_dns_add_sibling_12_113_days_aging ( self ) :
self . _test_dns_add_sibling ( 12 , 113 , True )
def test_dns_add_sibling_12_113_days_no_aging ( self ) :
self . _test_dns_add_sibling ( 12 , 113 , False )
def test_dns_add_sibling_0_7_days_aging ( self ) :
# 1e9 days ago evaluates to 0, i.e static
self . _test_dns_add_sibling ( 1e9 , 7 , True )
def test_dns_add_sibling_0_7_days_no_aging ( self ) :
self . _test_dns_add_sibling ( 1e9 , 7 , False )
def test_dns_add_sibling_0_0_days_aging ( self ) :
self . _test_dns_add_sibling ( 1e9 , 0 , True )
def test_dns_add_sibling_0_0_days_no_aging ( self ) :
self . _test_dns_add_sibling ( 1e9 , 0 , False )
def test_dns_add_sibling_10_0_days_aging ( self ) :
self . _test_dns_add_sibling ( 10 , 0 , True )
def test_dns_add_sibling_10_0_days_no_aging ( self ) :
self . _test_dns_add_sibling ( 10 , 0 , False )
def test_dns_add_sibling_2_7_days_aging_touch ( self ) :
self . _test_dns_add_sibling ( 2 , 7 , True , True )
def test_dns_add_sibling_2_7_days_no_aging_touch ( self ) :
self . _test_dns_add_sibling ( 2 , 7 , False , True )
def test_dns_add_sibling_12_7_days_aging_touch ( self ) :
self . _test_dns_add_sibling ( 12 , 7 , True , True )
def test_dns_add_sibling_12_7_days_no_aging_touch ( self ) :
self . _test_dns_add_sibling ( 12 , 7 , False , True )
def test_dns_add_sibling_12_3_days_aging_touch ( self ) :
self . _test_dns_add_sibling ( 12 , 3 , True , True )
def test_dns_add_sibling_12_3_days_no_aging_touch ( self ) :
self . _test_dns_add_sibling ( 12 , 3 , False , True )
def test_dns_add_sibling_112_7_days_aging_touch ( self ) :
self . _test_dns_add_sibling ( 112 , 7 , True , True )
def test_dns_add_sibling_112_7_days_no_aging_touch ( self ) :
self . _test_dns_add_sibling ( 112 , 7 , False , True )
def test_dns_add_sibling_12_113_days_aging_touch ( self ) :
self . _test_dns_add_sibling ( 12 , 113 , True , True )
def test_dns_add_sibling_12_113_days_no_aging_touch ( self ) :
self . _test_dns_add_sibling ( 12 , 113 , False , True )
def test_dns_add_sibling_0_7_days_aging_touch ( self ) :
self . _test_dns_add_sibling ( 1e9 , 7 , True , True )
def test_dns_add_sibling_0_7_days_no_aging_touch ( self ) :
self . _test_dns_add_sibling ( 1e9 , 7 , False , True )
def test_dns_add_sibling_0_0_days_aging_touch ( self ) :
self . _test_dns_add_sibling ( 1e9 , 0 , True , True )
def test_dns_add_sibling_0_0_days_no_aging_touch ( self ) :
self . _test_dns_add_sibling ( 1e9 , 0 , False , True )
def test_dns_add_sibling_10_0_days_aging_touch ( self ) :
self . _test_dns_add_sibling ( 10 , 0 , True , True )
def test_dns_add_sibling_10_0_days_no_aging_touch ( self ) :
self . _test_dns_add_sibling ( 10 , 0 , False , True )
2021-04-28 17:40:08 +12:00
TestProgram ( module = __name__ , opts = subunitopts )