2016-03-17 17:13:28 +13:00
# Unix SMB/CIFS implementation.
# Copyright (C) Kai Blin <kai@samba.org> 2011
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
2018-03-09 13:38:42 +00:00
from __future__ import print_function
2016-03-17 17:13:28 +13:00
import os
import sys
import struct
import random
import socket
import samba
import time
import errno
import samba . ndr as ndr
from samba import credentials , param
from samba . tests import TestCase
from samba . dcerpc import dns , dnsp , dnsserver
from samba . netcmd . dns import TXTRecord , dns_record_match , data_to_dns_record
from samba . tests . subunitrun import SubunitOptions , TestProgram
import samba . getopt as options
import optparse
import subprocess
2016-04-11 15:18:34 +12:00
parser = optparse . OptionParser ( " dns_forwarder.py <server name> <server ip> (dns forwarder)+ [options] " )
2016-03-17 17:13:28 +13:00
sambaopts = options . SambaOptions ( parser )
parser . add_option_group ( sambaopts )
# This timeout only has relevance when testing against Windows
# Format errors tend to return patchy responses, so a timeout is needed.
parser . add_option ( " --timeout " , type = " int " , dest = " timeout " ,
help = " Specify timeout for DNS requests " )
# use command line creds if available
credopts = options . CredentialsOptions ( parser )
parser . add_option_group ( credopts )
subunitopts = SubunitOptions ( parser )
parser . add_option_group ( subunitopts )
opts , args = parser . parse_args ( )
lp = sambaopts . get_loadparm ( )
creds = credopts . get_credentials ( lp )
timeout = opts . timeout
2016-04-11 15:18:34 +12:00
if len ( args ) < 3 :
2016-03-17 17:13:28 +13:00
parser . print_usage ( )
sys . exit ( 1 )
server_name = args [ 0 ]
server_ip = args [ 1 ]
2016-04-11 15:18:34 +12:00
dns_servers = args [ 2 : ]
2016-03-17 17:13:28 +13:00
creds . set_krb_forwardable ( credentials . NO_KRB_FORWARDABLE )
def make_txt_record ( records ) :
rdata_txt = dns . txt_record ( )
s_list = dnsp . string_list ( )
s_list . count = len ( records )
s_list . str = records
rdata_txt . txt = s_list
return rdata_txt
class DNSTest ( TestCase ) :
2016-05-13 12:52:18 +12:00
errcodes = dict ( ( v , k ) for k , v in vars ( dns ) . items ( ) if k . startswith ( ' DNS_RCODE_ ' ) )
2016-03-17 17:13:28 +13:00
def assert_dns_rcode_equals ( self , packet , rcode ) :
" Helper function to check return code "
p_errcode = packet . operation & 0x000F
self . assertEquals ( p_errcode , rcode , " Expected RCODE %s , got %s " %
( self . errcodes [ rcode ] , self . errcodes [ p_errcode ] ) )
def assert_dns_opcode_equals ( self , packet , opcode ) :
" Helper function to check opcode "
p_opcode = packet . operation & 0x7800
self . assertEquals ( p_opcode , opcode , " Expected OPCODE %s , got %s " %
( opcode , p_opcode ) )
def make_name_packet ( self , opcode , qid = None ) :
" Helper creating a dns.name_packet "
p = dns . name_packet ( )
if qid is None :
p . id = random . randint ( 0x0 , 0xffff )
p . operation = opcode
p . questions = [ ]
return p
def finish_name_packet ( self , packet , questions ) :
" Helper to finalize a dns.name_packet "
packet . qdcount = len ( questions )
packet . questions = questions
def make_name_question ( self , name , qtype , qclass ) :
" Helper creating a dns.name_question "
q = dns . name_question ( )
q . name = name
q . question_type = qtype
q . question_class = qclass
return q
def get_dns_domain ( self ) :
" Helper to get dns domain "
return self . creds . get_realm ( ) . lower ( )
def dns_transaction_udp ( self , packet , host = server_ip ,
dump = False , timeout = timeout ) :
" send a DNS query and read the reply "
s = None
try :
send_packet = ndr . ndr_pack ( packet )
if dump :
2018-03-09 13:38:42 +00:00
print ( self . hexdump ( send_packet ) )
2016-03-17 17:13:28 +13:00
s = socket . socket ( socket . AF_INET , socket . SOCK_DGRAM , 0 )
s . settimeout ( timeout )
s . connect ( ( host , 53 ) )
s . send ( send_packet , 0 )
recv_packet = s . recv ( 2048 , 0 )
if dump :
2018-03-09 13:38:42 +00:00
print ( self . hexdump ( recv_packet ) )
2016-03-17 17:13:28 +13:00
return ndr . ndr_unpack ( dns . name_packet , recv_packet )
finally :
if s is not None :
s . close ( )
def make_cname_update ( self , key , value ) :
p = self . make_name_packet ( dns . DNS_OPCODE_UPDATE )
name = self . get_dns_domain ( )
u = self . make_name_question ( name , dns . DNS_QTYPE_SOA , dns . DNS_QCLASS_IN )
self . finish_name_packet ( p , [ u ] )
r = dns . res_rec ( )
r . name = key
r . rr_type = dns . DNS_QTYPE_CNAME
r . rr_class = dns . DNS_QCLASS_IN
r . ttl = 900
r . length = 0xffff
rdata = value
r . rdata = rdata
p . nscount = 1
p . nsrecs = [ r ]
response = self . dns_transaction_udp ( p )
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
def contact_real_server ( host , port ) :
s = socket . socket ( socket . AF_INET , socket . SOCK_DGRAM , 0 )
s . connect ( ( host , port ) )
return s
class TestDnsForwarding ( DNSTest ) :
def __init__ ( self , * args , * * kwargs ) :
super ( TestDnsForwarding , self ) . __init__ ( * args , * * kwargs )
self . subprocesses = [ ]
def setUp ( self ) :
super ( TestDnsForwarding , self ) . setUp ( )
self . server = server_name
self . server_ip = server_ip
self . lp = lp
self . creds = creds
def start_toy_server ( self , host , port , id ) :
python = sys . executable
p = subprocess . Popen ( [ python ,
os . path . join ( samba . source_tree_topdir ( ) ,
' python/samba/tests/ '
' dns_forwarder_helpers/server.py ' ) ,
host , str ( port ) , id ] )
self . subprocesses . append ( p )
s = socket . socket ( socket . AF_INET , socket . SOCK_DGRAM , 0 )
2018-05-04 12:16:38 +01:00
for i in range ( 300 ) :
2016-09-06 10:48:57 +12:00
time . sleep ( 0.05 )
2016-03-17 17:13:28 +13:00
s . connect ( ( host , port ) )
try :
s . send ( ' timeout 0 ' , 0 )
2018-02-14 10:27:52 +13:00
except socket . error as e :
2016-03-17 17:13:28 +13:00
if e . errno in ( errno . ECONNREFUSED , errno . EHOSTUNREACH ) :
continue
2016-07-07 16:58:27 +12:00
if p . returncode is not None :
self . fail ( " Toy server has managed to die already! " )
2016-03-17 17:13:28 +13:00
return s
def tearDown ( self ) :
super ( TestDnsForwarding , self ) . tearDown ( )
for p in self . subprocesses :
p . kill ( )
def test_comatose_forwarder ( self ) :
2016-04-11 15:18:34 +12:00
s = self . start_toy_server ( dns_servers [ 0 ] , 53 , ' forwarder1 ' )
2016-03-17 17:13:28 +13:00
s . send ( " timeout 1000000 " , 0 )
# make DNS query
name = " an-address-that-will-not-resolve "
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
q = self . make_name_question ( name , dns . DNS_QTYPE_TXT , dns . DNS_QCLASS_IN )
questions . append ( q )
self . finish_name_packet ( p , questions )
send_packet = ndr . ndr_pack ( p )
s . send ( send_packet , 0 )
s . settimeout ( 1 )
try :
s . recv ( 0xffff + 2 , 0 )
self . fail ( " DNS forwarder should have been inactive " )
except socket . timeout :
# Expected forwarder to be dead
pass
2016-04-13 13:09:41 +12:00
def test_no_active_forwarder ( self ) :
ad = contact_real_server ( server_ip , 53 )
name = " dsfsfds.dsfsdfs "
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
q = self . make_name_question ( name , dns . DNS_QTYPE_TXT , dns . DNS_QCLASS_IN )
questions . append ( q )
self . finish_name_packet ( p , questions )
send_packet = ndr . ndr_pack ( p )
self . finish_name_packet ( p , questions )
p . operation | = dns . DNS_FLAG_RECURSION_DESIRED
send_packet = ndr . ndr_pack ( p )
ad . send ( send_packet , 0 )
ad . settimeout ( timeout )
try :
data = ad . recv ( 0xffff + 2 , 0 )
data = ndr . ndr_unpack ( dns . name_packet , data )
self . assert_dns_rcode_equals ( data , dns . DNS_RCODE_SERVFAIL )
self . assertEqual ( data . ancount , 0 )
except socket . timeout :
self . fail ( " DNS server is too slow (timeout %s ) " % timeout )
2016-04-18 16:31:17 +12:00
def test_no_flag_recursive_forwarder ( self ) :
ad = contact_real_server ( server_ip , 53 )
name = " dsfsfds.dsfsdfs "
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
q = self . make_name_question ( name , dns . DNS_QTYPE_TXT , dns . DNS_QCLASS_IN )
questions . append ( q )
self . finish_name_packet ( p , questions )
send_packet = ndr . ndr_pack ( p )
self . finish_name_packet ( p , questions )
# Leave off the recursive flag
send_packet = ndr . ndr_pack ( p )
ad . send ( send_packet , 0 )
ad . settimeout ( timeout )
try :
data = ad . recv ( 0xffff + 2 , 0 )
data = ndr . ndr_unpack ( dns . name_packet , data )
self . assert_dns_rcode_equals ( data , dns . DNS_RCODE_NXDOMAIN )
self . assertEqual ( data . ancount , 0 )
except socket . timeout :
self . fail ( " DNS server is too slow (timeout %s ) " % timeout )
2016-03-17 17:13:28 +13:00
def test_single_forwarder ( self ) :
2016-04-11 15:18:34 +12:00
s = self . start_toy_server ( dns_servers [ 0 ] , 53 , ' forwarder1 ' )
2016-03-17 17:13:28 +13:00
ad = contact_real_server ( server_ip , 53 )
name = " dsfsfds.dsfsdfs "
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
q = self . make_name_question ( name , dns . DNS_QTYPE_CNAME ,
dns . DNS_QCLASS_IN )
questions . append ( q )
self . finish_name_packet ( p , questions )
p . operation | = dns . DNS_FLAG_RECURSION_DESIRED
send_packet = ndr . ndr_pack ( p )
ad . send ( send_packet , 0 )
ad . settimeout ( timeout )
try :
data = ad . recv ( 0xffff + 2 , 0 )
data = ndr . ndr_unpack ( dns . name_packet , data )
self . assert_dns_rcode_equals ( data , dns . DNS_RCODE_OK )
self . assertEqual ( ' forwarder1 ' , data . answers [ 0 ] . rdata )
except socket . timeout :
self . fail ( " DNS server is too slow (timeout %s ) " % timeout )
def test_single_forwarder_not_actually_there ( self ) :
ad = contact_real_server ( server_ip , 53 )
name = " dsfsfds.dsfsdfs "
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
q = self . make_name_question ( name , dns . DNS_QTYPE_CNAME ,
dns . DNS_QCLASS_IN )
questions . append ( q )
self . finish_name_packet ( p , questions )
p . operation | = dns . DNS_FLAG_RECURSION_DESIRED
send_packet = ndr . ndr_pack ( p )
ad . send ( send_packet , 0 )
ad . settimeout ( timeout )
try :
data = ad . recv ( 0xffff + 2 , 0 )
data = ndr . ndr_unpack ( dns . name_packet , data )
self . assert_dns_rcode_equals ( data , dns . DNS_RCODE_SERVFAIL )
except socket . timeout :
self . fail ( " DNS server is too slow (timeout %s ) " % timeout )
def test_single_forwarder_waiting_forever ( self ) :
2016-04-11 15:18:34 +12:00
s = self . start_toy_server ( dns_servers [ 0 ] , 53 , ' forwarder1 ' )
2016-03-17 17:13:28 +13:00
s . send ( ' timeout 10000 ' , 0 )
ad = contact_real_server ( server_ip , 53 )
name = " dsfsfds.dsfsdfs "
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
q = self . make_name_question ( name , dns . DNS_QTYPE_CNAME ,
dns . DNS_QCLASS_IN )
questions . append ( q )
self . finish_name_packet ( p , questions )
p . operation | = dns . DNS_FLAG_RECURSION_DESIRED
send_packet = ndr . ndr_pack ( p )
ad . send ( send_packet , 0 )
ad . settimeout ( timeout )
try :
data = ad . recv ( 0xffff + 2 , 0 )
data = ndr . ndr_unpack ( dns . name_packet , data )
self . assert_dns_rcode_equals ( data , dns . DNS_RCODE_SERVFAIL )
except socket . timeout :
self . fail ( " DNS server is too slow (timeout %s ) " % timeout )
def test_double_forwarder_first_frozen ( self ) :
2016-04-11 15:18:34 +12:00
if len ( dns_servers ) < 2 :
2018-03-09 13:38:42 +00:00
print ( " Ignoring test_double_forwarder_first_frozen " )
2016-04-11 15:18:34 +12:00
return
s1 = self . start_toy_server ( dns_servers [ 0 ] , 53 , ' forwarder1 ' )
s2 = self . start_toy_server ( dns_servers [ 1 ] , 53 , ' forwarder2 ' )
2016-03-17 17:13:28 +13:00
s1 . send ( ' timeout 1000 ' , 0 )
ad = contact_real_server ( server_ip , 53 )
name = " dsfsfds.dsfsdfs "
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
q = self . make_name_question ( name , dns . DNS_QTYPE_CNAME ,
dns . DNS_QCLASS_IN )
questions . append ( q )
self . finish_name_packet ( p , questions )
p . operation | = dns . DNS_FLAG_RECURSION_DESIRED
send_packet = ndr . ndr_pack ( p )
ad . send ( send_packet , 0 )
ad . settimeout ( timeout )
try :
data = ad . recv ( 0xffff + 2 , 0 )
data = ndr . ndr_unpack ( dns . name_packet , data )
self . assert_dns_rcode_equals ( data , dns . DNS_RCODE_OK )
self . assertEqual ( ' forwarder2 ' , data . answers [ 0 ] . rdata )
except socket . timeout :
self . fail ( " DNS server is too slow (timeout %s ) " % timeout )
def test_double_forwarder_first_down ( self ) :
2016-04-11 15:18:34 +12:00
if len ( dns_servers ) < 2 :
2018-03-09 13:38:42 +00:00
print ( " Ignoring test_double_forwarder_first_down " )
2016-04-11 15:18:34 +12:00
return
s2 = self . start_toy_server ( dns_servers [ 1 ] , 53 , ' forwarder2 ' )
2016-03-17 17:13:28 +13:00
ad = contact_real_server ( server_ip , 53 )
name = " dsfsfds.dsfsdfs "
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
q = self . make_name_question ( name , dns . DNS_QTYPE_CNAME ,
dns . DNS_QCLASS_IN )
questions . append ( q )
self . finish_name_packet ( p , questions )
p . operation | = dns . DNS_FLAG_RECURSION_DESIRED
send_packet = ndr . ndr_pack ( p )
ad . send ( send_packet , 0 )
ad . settimeout ( timeout )
try :
data = ad . recv ( 0xffff + 2 , 0 )
data = ndr . ndr_unpack ( dns . name_packet , data )
self . assert_dns_rcode_equals ( data , dns . DNS_RCODE_OK )
self . assertEqual ( ' forwarder2 ' , data . answers [ 0 ] . rdata )
except socket . timeout :
self . fail ( " DNS server is too slow (timeout %s ) " % timeout )
def test_double_forwarder_both_slow ( self ) :
2016-04-11 15:18:34 +12:00
if len ( dns_servers ) < 2 :
2018-03-09 13:38:42 +00:00
print ( " Ignoring test_double_forwarder_both_slow " )
2016-04-11 15:18:34 +12:00
return
s1 = self . start_toy_server ( dns_servers [ 0 ] , 53 , ' forwarder1 ' )
s2 = self . start_toy_server ( dns_servers [ 1 ] , 53 , ' forwarder2 ' )
2016-03-17 17:13:28 +13:00
s1 . send ( ' timeout 1.5 ' , 0 )
s2 . send ( ' timeout 1.5 ' , 0 )
ad = contact_real_server ( server_ip , 53 )
name = " dsfsfds.dsfsdfs "
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
q = self . make_name_question ( name , dns . DNS_QTYPE_CNAME ,
dns . DNS_QCLASS_IN )
questions . append ( q )
self . finish_name_packet ( p , questions )
p . operation | = dns . DNS_FLAG_RECURSION_DESIRED
send_packet = ndr . ndr_pack ( p )
ad . send ( send_packet , 0 )
ad . settimeout ( timeout )
try :
data = ad . recv ( 0xffff + 2 , 0 )
data = ndr . ndr_unpack ( dns . name_packet , data )
self . assert_dns_rcode_equals ( data , dns . DNS_RCODE_OK )
self . assertEqual ( ' forwarder1 ' , data . answers [ 0 ] . rdata )
except socket . timeout :
self . fail ( " DNS server is too slow (timeout %s ) " % timeout )
def test_cname ( self ) :
2016-04-11 15:18:34 +12:00
s1 = self . start_toy_server ( dns_servers [ 0 ] , 53 , ' forwarder1 ' )
2016-03-17 17:13:28 +13:00
ad = contact_real_server ( server_ip , 53 )
name = " resolve.cname "
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
q = self . make_name_question ( name , dns . DNS_QTYPE_CNAME ,
dns . DNS_QCLASS_IN )
questions . append ( q )
self . finish_name_packet ( p , questions )
p . operation | = dns . DNS_FLAG_RECURSION_DESIRED
send_packet = ndr . ndr_pack ( p )
ad . send ( send_packet , 0 )
ad . settimeout ( timeout )
try :
data = ad . recv ( 0xffff + 2 , 0 )
data = ndr . ndr_unpack ( dns . name_packet , data )
self . assert_dns_rcode_equals ( data , dns . DNS_RCODE_OK )
2016-06-09 03:52:38 +02:00
self . assertEqual ( len ( data . answers ) , 1 )
self . assertEqual ( ' forwarder1 ' , data . answers [ 0 ] . rdata )
2016-03-17 17:13:28 +13:00
except socket . timeout :
self . fail ( " DNS server is too slow (timeout %s ) " % timeout )
def test_double_cname ( self ) :
2016-04-11 15:18:34 +12:00
s1 = self . start_toy_server ( dns_servers [ 0 ] , 53 , ' forwarder1 ' )
2016-03-17 17:13:28 +13:00
name = ' resolve.cname. %s ' % self . get_dns_domain ( )
self . make_cname_update ( name , " dsfsfds.dsfsdfs " )
ad = contact_real_server ( server_ip , 53 )
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
q = self . make_name_question ( name , dns . DNS_QTYPE_A ,
dns . DNS_QCLASS_IN )
questions . append ( q )
self . finish_name_packet ( p , questions )
p . operation | = dns . DNS_FLAG_RECURSION_DESIRED
send_packet = ndr . ndr_pack ( p )
ad . send ( send_packet , 0 )
ad . settimeout ( timeout )
try :
data = ad . recv ( 0xffff + 2 , 0 )
data = ndr . ndr_unpack ( dns . name_packet , data )
self . assert_dns_rcode_equals ( data , dns . DNS_RCODE_OK )
self . assertEqual ( ' forwarder1 ' , data . answers [ 1 ] . rdata )
except socket . timeout :
self . fail ( " DNS server is too slow (timeout %s ) " % timeout )
def test_cname_forwarding_with_slow_server ( self ) :
2016-04-11 15:18:34 +12:00
if len ( dns_servers ) < 2 :
2018-03-09 13:38:42 +00:00
print ( " Ignoring test_cname_forwarding_with_slow_server " )
2016-04-11 15:18:34 +12:00
return
s1 = self . start_toy_server ( dns_servers [ 0 ] , 53 , ' forwarder1 ' )
s2 = self . start_toy_server ( dns_servers [ 1 ] , 53 , ' forwarder2 ' )
2016-03-17 17:13:28 +13:00
s1 . send ( ' timeout 10000 ' , 0 )
name = ' resolve.cname. %s ' % self . get_dns_domain ( )
self . make_cname_update ( name , " dsfsfds.dsfsdfs " )
ad = contact_real_server ( server_ip , 53 )
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
q = self . make_name_question ( name , dns . DNS_QTYPE_A ,
dns . DNS_QCLASS_IN )
questions . append ( q )
self . finish_name_packet ( p , questions )
p . operation | = dns . DNS_FLAG_RECURSION_DESIRED
send_packet = ndr . ndr_pack ( p )
ad . send ( send_packet , 0 )
ad . settimeout ( timeout )
try :
data = ad . recv ( 0xffff + 2 , 0 )
data = ndr . ndr_unpack ( dns . name_packet , data )
self . assert_dns_rcode_equals ( data , dns . DNS_RCODE_OK )
self . assertEqual ( ' forwarder2 ' , data . answers [ - 1 ] . rdata )
except socket . timeout :
self . fail ( " DNS server is too slow (timeout %s ) " % timeout )
def test_cname_forwarding_with_server_down ( self ) :
2016-04-11 15:18:34 +12:00
if len ( dns_servers ) < 2 :
2018-03-09 13:38:42 +00:00
print ( " Ignoring test_cname_forwarding_with_server_down " )
2016-04-11 15:18:34 +12:00
return
s2 = self . start_toy_server ( dns_servers [ 1 ] , 53 , ' forwarder2 ' )
2016-03-17 17:13:28 +13:00
name1 = ' resolve1.cname. %s ' % self . get_dns_domain ( )
name2 = ' resolve2.cname. %s ' % self . get_dns_domain ( )
self . make_cname_update ( name1 , name2 )
self . make_cname_update ( name2 , " dsfsfds.dsfsdfs " )
ad = contact_real_server ( server_ip , 53 )
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
q = self . make_name_question ( name1 , dns . DNS_QTYPE_A ,
dns . DNS_QCLASS_IN )
questions . append ( q )
self . finish_name_packet ( p , questions )
p . operation | = dns . DNS_FLAG_RECURSION_DESIRED
send_packet = ndr . ndr_pack ( p )
ad . send ( send_packet , 0 )
ad . settimeout ( timeout )
try :
data = ad . recv ( 0xffff + 2 , 0 )
data = ndr . ndr_unpack ( dns . name_packet , data )
self . assert_dns_rcode_equals ( data , dns . DNS_RCODE_OK )
self . assertEqual ( ' forwarder2 ' , data . answers [ - 1 ] . rdata )
except socket . timeout :
self . fail ( " DNS server is too slow (timeout %s ) " % timeout )
def test_cname_forwarding_with_lots_of_cnames ( self ) :
name3 = ' resolve3.cname. %s ' % self . get_dns_domain ( )
2016-04-11 15:18:34 +12:00
s1 = self . start_toy_server ( dns_servers [ 0 ] , 53 , name3 )
2016-03-17 17:13:28 +13:00
name1 = ' resolve1.cname. %s ' % self . get_dns_domain ( )
name2 = ' resolve2.cname. %s ' % self . get_dns_domain ( )
self . make_cname_update ( name1 , name2 )
self . make_cname_update ( name3 , name1 )
self . make_cname_update ( name2 , " dsfsfds.dsfsdfs " )
ad = contact_real_server ( server_ip , 53 )
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
q = self . make_name_question ( name1 , dns . DNS_QTYPE_A ,
dns . DNS_QCLASS_IN )
questions . append ( q )
self . finish_name_packet ( p , questions )
p . operation | = dns . DNS_FLAG_RECURSION_DESIRED
send_packet = ndr . ndr_pack ( p )
ad . send ( send_packet , 0 )
ad . settimeout ( timeout )
try :
data = ad . recv ( 0xffff + 2 , 0 )
data = ndr . ndr_unpack ( dns . name_packet , data )
# This should cause a loop in Windows
# (which is restricted by a 20 CNAME limit)
#
# The reason it doesn't here is because forwarded CNAME have no
# additional processing in the internal DNS server.
self . assert_dns_rcode_equals ( data , dns . DNS_RCODE_OK )
self . assertEqual ( name3 , data . answers [ - 1 ] . rdata )
except socket . timeout :
self . fail ( " DNS server is too slow (timeout %s ) " % timeout )
TestProgram ( module = __name__ , opts = subunitopts )