2011-11-11 00:32:09 +01:00
# Unix SMB/CIFS implementation.
# Copyright (C) Kai Blin <kai@samba.org> 2011
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
2018-03-09 13:38:42 +00:00
from __future__ import print_function
2018-05-09 18:02:28 +12:00
from samba import dsdb
from samba . ndr import ndr_unpack , ndr_pack
from samba . samdb import SamDB
from samba . auth import system_session
import ldb
2011-11-11 00:32:09 +01:00
import os
2016-01-29 17:03:56 +13:00
import sys
2011-11-11 00:32:09 +01:00
import struct
import random
2013-07-05 12:07:49 +02:00
import socket
2011-11-11 00:32:09 +01:00
import samba . ndr as ndr
2014-12-16 18:04:13 +01:00
from samba import credentials , param
2016-01-06 14:12:35 +13:00
from samba . dcerpc import dns , dnsp , dnsserver
2016-01-28 12:54:58 +13:00
from samba . netcmd . dns import TXTRecord , dns_record_match , data_to_dns_record
2016-01-29 17:03:56 +13:00
from samba . tests . subunitrun import SubunitOptions , TestProgram
2017-06-08 16:20:42 +12:00
from samba import werror , WERRORError
2017-06-01 13:26:37 +12:00
from samba . tests . dns_base import DNSTest
2016-01-29 17:03:56 +13:00
import samba . getopt as options
import optparse
parser = optparse . OptionParser ( " dns.py <server name> <server ip> [options] " )
sambaopts = options . SambaOptions ( parser )
parser . add_option_group ( sambaopts )
2011-11-11 00:32:09 +01:00
2016-01-21 15:43:55 +13:00
# This timeout only has relevance when testing against Windows
# Format errors tend to return patchy responses, so a timeout is needed.
2016-01-29 17:03:56 +13:00
parser . add_option ( " --timeout " , type = " int " , dest = " timeout " ,
help = " Specify timeout for DNS requests " )
# use command line creds if available
credopts = options . CredentialsOptions ( parser )
parser . add_option_group ( credopts )
subunitopts = SubunitOptions ( parser )
parser . add_option_group ( subunitopts )
opts , args = parser . parse_args ( )
lp = sambaopts . get_loadparm ( )
creds = credopts . get_credentials ( lp )
timeout = opts . timeout
if len ( args ) < 2 :
parser . print_usage ( )
sys . exit ( 1 )
server_name = args [ 0 ]
server_ip = args [ 1 ]
creds . set_krb_forwardable ( credentials . NO_KRB_FORWARDABLE )
2013-01-14 00:56:48 +01:00
2012-05-30 08:08:53 +02:00
class TestSimpleQueries ( DNSTest ) :
2017-06-09 10:00:09 +12:00
def setUp ( self ) :
super ( TestSimpleQueries , self ) . setUp ( )
global server , server_ip , lp , creds , timeout
self . server = server_name
self . server_ip = server_ip
self . lp = lp
self . creds = creds
self . timeout = timeout
2012-09-16 14:18:51 +02:00
2011-11-11 00:32:09 +01:00
def test_one_a_query ( self ) :
" create a query packet containing one query record "
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
2016-01-29 17:28:54 +13:00
name = " %s . %s " % ( self . server , self . get_dns_domain ( ) )
2011-11-11 00:32:09 +01:00
q = self . make_name_question ( name , dns . DNS_QTYPE_A , dns . DNS_QCLASS_IN )
2018-03-09 13:38:42 +00:00
print ( " asking for " , q . name )
2011-11-11 00:32:09 +01:00
questions . append ( q )
self . finish_name_packet ( p , questions )
2017-06-09 10:00:09 +12:00
( response , response_packet ) = self . dns_transaction_udp ( p , host = server_ip )
2011-11-11 00:32:09 +01:00
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
self . assert_dns_opcode_equals ( response , dns . DNS_OPCODE_QUERY )
2011-11-24 12:10:40 +01:00
self . assertEquals ( response . ancount , 1 )
self . assertEquals ( response . answers [ 0 ] . rdata ,
2016-01-29 17:28:54 +13:00
self . server_ip )
2011-11-11 00:32:09 +01:00
2017-06-08 15:54:22 +12:00
def test_one_SOA_query ( self ) :
" create a query packet containing one query record for the SOA "
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
name = " %s " % ( self . get_dns_domain ( ) )
q = self . make_name_question ( name , dns . DNS_QTYPE_SOA , dns . DNS_QCLASS_IN )
2018-03-09 13:38:42 +00:00
print ( " asking for " , q . name )
2017-06-08 15:54:22 +12:00
questions . append ( q )
self . finish_name_packet ( p , questions )
2017-06-09 10:00:09 +12:00
( response , response_packet ) = self . dns_transaction_udp ( p , host = server_ip )
2017-06-08 15:54:22 +12:00
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
self . assert_dns_opcode_equals ( response , dns . DNS_OPCODE_QUERY )
self . assertEquals ( response . ancount , 1 )
self . assertEquals ( response . answers [ 0 ] . rdata . mname . upper ( ) ,
( " %s . %s " % ( self . server , self . get_dns_domain ( ) ) ) . upper ( ) )
2012-03-06 08:49:16 +01:00
def test_one_a_query_tcp ( self ) :
" create a query packet containing one query record via TCP "
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
2016-01-29 17:28:54 +13:00
name = " %s . %s " % ( self . server , self . get_dns_domain ( ) )
2012-03-06 08:49:16 +01:00
q = self . make_name_question ( name , dns . DNS_QTYPE_A , dns . DNS_QCLASS_IN )
2018-03-09 13:38:42 +00:00
print ( " asking for " , q . name )
2012-03-06 08:49:16 +01:00
questions . append ( q )
self . finish_name_packet ( p , questions )
2017-06-09 10:00:09 +12:00
( response , response_packet ) = self . dns_transaction_tcp ( p , host = server_ip )
2012-03-06 08:49:16 +01:00
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
self . assert_dns_opcode_equals ( response , dns . DNS_OPCODE_QUERY )
self . assertEquals ( response . ancount , 1 )
self . assertEquals ( response . answers [ 0 ] . rdata ,
2016-01-29 17:28:54 +13:00
self . server_ip )
2012-03-06 08:49:16 +01:00
2014-02-27 23:49:24 +01:00
def test_one_mx_query ( self ) :
" create a query packet causing an empty RCODE_OK answer "
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
2016-01-29 17:28:54 +13:00
name = " %s . %s " % ( self . server , self . get_dns_domain ( ) )
2014-02-27 23:49:24 +01:00
q = self . make_name_question ( name , dns . DNS_QTYPE_MX , dns . DNS_QCLASS_IN )
2018-03-09 13:38:42 +00:00
print ( " asking for " , q . name )
2014-02-27 23:49:24 +01:00
questions . append ( q )
self . finish_name_packet ( p , questions )
2017-06-09 10:00:09 +12:00
( response , response_packet ) = self . dns_transaction_udp ( p , host = server_ip )
2014-02-27 23:49:24 +01:00
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
self . assert_dns_opcode_equals ( response , dns . DNS_OPCODE_QUERY )
self . assertEquals ( response . ancount , 0 )
2014-02-28 10:35:07 +01:00
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
2016-01-29 17:28:54 +13:00
name = " invalid- %s . %s " % ( self . server , self . get_dns_domain ( ) )
2014-02-28 10:35:07 +01:00
q = self . make_name_question ( name , dns . DNS_QTYPE_MX , dns . DNS_QCLASS_IN )
2018-03-09 13:38:42 +00:00
print ( " asking for " , q . name )
2014-02-28 10:35:07 +01:00
questions . append ( q )
self . finish_name_packet ( p , questions )
2017-06-09 10:00:09 +12:00
( response , response_packet ) = self . dns_transaction_udp ( p , host = server_ip )
2014-02-28 10:35:07 +01:00
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_NXDOMAIN )
self . assert_dns_opcode_equals ( response , dns . DNS_OPCODE_QUERY )
self . assertEquals ( response . ancount , 0 )
2011-11-11 00:32:09 +01:00
def test_two_queries ( self ) :
" create a query packet containing two query records "
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
2016-01-29 17:28:54 +13:00
name = " %s . %s " % ( self . server , self . get_dns_domain ( ) )
2011-11-11 00:32:09 +01:00
q = self . make_name_question ( name , dns . DNS_QTYPE_A , dns . DNS_QCLASS_IN )
questions . append ( q )
name = " %s . %s " % ( ' bogusname ' , self . get_dns_domain ( ) )
q = self . make_name_question ( name , dns . DNS_QTYPE_A , dns . DNS_QCLASS_IN )
questions . append ( q )
self . finish_name_packet ( p , questions )
2016-01-21 15:43:55 +13:00
try :
2017-06-09 10:00:09 +12:00
( response , response_packet ) = self . dns_transaction_udp ( p , host = server_ip )
2016-01-21 15:43:55 +13:00
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_FORMERR )
except socket . timeout :
# Windows chooses not to respond to incorrectly formatted queries.
# Although this appears to be non-deterministic even for the same
# request twice, it also appears to be based on a how poorly the
# request is formatted.
pass
2011-11-11 00:32:09 +01:00
2011-11-24 12:11:26 +01:00
def test_qtype_all_query ( self ) :
" create a QTYPE_ALL query "
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
2016-01-29 17:28:54 +13:00
name = " %s . %s " % ( self . server , self . get_dns_domain ( ) )
2011-11-24 12:11:26 +01:00
q = self . make_name_question ( name , dns . DNS_QTYPE_ALL , dns . DNS_QCLASS_IN )
2018-03-09 13:38:42 +00:00
print ( " asking for " , q . name )
2011-11-24 12:11:26 +01:00
questions . append ( q )
self . finish_name_packet ( p , questions )
2017-06-09 10:00:09 +12:00
( response , response_packet ) = self . dns_transaction_udp ( p , host = server_ip )
2011-11-24 12:11:26 +01:00
num_answers = 1
2012-09-11 00:14:39 +02:00
dc_ipv6 = os . getenv ( ' SERVER_IPV6 ' )
2011-11-24 12:11:26 +01:00
if dc_ipv6 is not None :
num_answers + = 1
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
self . assert_dns_opcode_equals ( response , dns . DNS_OPCODE_QUERY )
self . assertEquals ( response . ancount , num_answers )
self . assertEquals ( response . answers [ 0 ] . rdata ,
2016-01-29 17:28:54 +13:00
self . server_ip )
2011-11-24 12:11:26 +01:00
if dc_ipv6 is not None :
self . assertEquals ( response . answers [ 1 ] . rdata , dc_ipv6 )
2011-11-11 00:32:09 +01:00
2011-11-24 12:14:55 +01:00
def test_qclass_none_query ( self ) :
" create a QCLASS_NONE query "
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
2016-01-29 17:28:54 +13:00
name = " %s . %s " % ( self . server , self . get_dns_domain ( ) )
2011-11-24 12:14:55 +01:00
q = self . make_name_question ( name , dns . DNS_QTYPE_ALL , dns . DNS_QCLASS_NONE )
questions . append ( q )
self . finish_name_packet ( p , questions )
2016-01-21 15:43:55 +13:00
try :
2017-06-09 10:00:09 +12:00
( response , response_packet ) = self . dns_transaction_udp ( p , host = server_ip )
2016-01-21 15:43:55 +13:00
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_NOTIMP )
except socket . timeout :
# Windows chooses not to respond to incorrectly formatted queries.
# Although this appears to be non-deterministic even for the same
# request twice, it also appears to be based on a how poorly the
# request is formatted.
pass
2011-11-24 12:14:55 +01:00
2015-07-17 15:27:51 +02:00
def test_soa_hostname_query ( self ) :
" create a SOA query for a hostname "
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
2016-01-29 17:28:54 +13:00
name = " %s . %s " % ( self . server , self . get_dns_domain ( ) )
2015-07-17 15:27:51 +02:00
q = self . make_name_question ( name , dns . DNS_QTYPE_SOA , dns . DNS_QCLASS_IN )
questions . append ( q )
self . finish_name_packet ( p , questions )
2017-06-09 10:00:09 +12:00
( response , response_packet ) = self . dns_transaction_udp ( p , host = server_ip )
2015-07-17 15:27:51 +02:00
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
self . assert_dns_opcode_equals ( response , dns . DNS_OPCODE_QUERY )
# We don't get SOA records for single hosts
self . assertEquals ( response . ancount , 0 )
# But we do respond with an authority section
self . assertEqual ( response . nscount , 1 )
2011-12-03 19:08:09 +01:00
def test_soa_domain_query ( self ) :
" create a SOA query for a domain "
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
name = self . get_dns_domain ( )
q = self . make_name_question ( name , dns . DNS_QTYPE_SOA , dns . DNS_QCLASS_IN )
questions . append ( q )
self . finish_name_packet ( p , questions )
2017-06-09 10:00:09 +12:00
( response , response_packet ) = self . dns_transaction_udp ( p , host = server_ip )
2011-12-03 19:08:09 +01:00
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
self . assert_dns_opcode_equals ( response , dns . DNS_OPCODE_QUERY )
self . assertEquals ( response . ancount , 1 )
2014-05-16 18:33:42 +02:00
self . assertEquals ( response . answers [ 0 ] . rdata . minimum , 3600 )
2011-11-24 12:14:55 +01:00
2012-05-30 08:08:53 +02:00
class TestDNSUpdates ( DNSTest ) :
2017-06-09 10:00:09 +12:00
def setUp ( self ) :
super ( TestDNSUpdates , self ) . setUp ( )
global server , server_ip , lp , creds , timeout
self . server = server_name
self . server_ip = server_ip
self . lp = lp
self . creds = creds
self . timeout = timeout
2012-09-16 14:18:51 +02:00
2011-12-09 01:14:35 +01:00
def test_two_updates ( self ) :
" create two update requests "
p = self . make_name_packet ( dns . DNS_OPCODE_UPDATE )
updates = [ ]
2016-01-29 17:28:54 +13:00
name = " %s . %s " % ( self . server , self . get_dns_domain ( ) )
2011-12-09 01:14:35 +01:00
u = self . make_name_question ( name , dns . DNS_QTYPE_A , dns . DNS_QCLASS_IN )
updates . append ( u )
name = self . get_dns_domain ( )
u = self . make_name_question ( name , dns . DNS_QTYPE_A , dns . DNS_QCLASS_IN )
updates . append ( u )
self . finish_name_packet ( p , updates )
2016-01-21 15:43:55 +13:00
try :
2017-06-09 10:00:09 +12:00
( response , response_packet ) = self . dns_transaction_udp ( p , host = server_ip )
2016-01-21 15:43:55 +13:00
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_FORMERR )
except socket . timeout :
# Windows chooses not to respond to incorrectly formatted queries.
# Although this appears to be non-deterministic even for the same
# request twice, it also appears to be based on a how poorly the
# request is formatted.
pass
2011-12-09 01:14:35 +01:00
2011-12-09 01:26:39 +01:00
def test_update_wrong_qclass ( self ) :
" create update with DNS_QCLASS_NONE "
p = self . make_name_packet ( dns . DNS_OPCODE_UPDATE )
updates = [ ]
name = self . get_dns_domain ( )
u = self . make_name_question ( name , dns . DNS_QTYPE_A , dns . DNS_QCLASS_NONE )
updates . append ( u )
self . finish_name_packet ( p , updates )
2017-06-09 10:00:09 +12:00
( response , response_packet ) = self . dns_transaction_udp ( p , host = server_ip )
2011-12-09 01:26:39 +01:00
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_NOTIMP )
2011-12-09 01:14:35 +01:00
2011-12-16 13:45:22 +01:00
def test_update_prereq_with_non_null_ttl ( self ) :
" test update with a non-null TTL "
p = self . make_name_packet ( dns . DNS_OPCODE_UPDATE )
updates = [ ]
name = self . get_dns_domain ( )
u = self . make_name_question ( name , dns . DNS_QTYPE_SOA , dns . DNS_QCLASS_IN )
updates . append ( u )
self . finish_name_packet ( p , updates )
prereqs = [ ]
r = dns . res_rec ( )
2016-01-29 17:28:54 +13:00
r . name = " %s . %s " % ( self . server , self . get_dns_domain ( ) )
2011-12-16 13:45:22 +01:00
r . rr_type = dns . DNS_QTYPE_TXT
r . rr_class = dns . DNS_QCLASS_NONE
r . ttl = 1
r . length = 0
prereqs . append ( r )
p . ancount = len ( prereqs )
p . answers = prereqs
2016-01-21 15:43:55 +13:00
try :
2017-06-09 10:00:09 +12:00
( response , response_packet ) = self . dns_transaction_udp ( p , host = server_ip )
2016-01-21 15:43:55 +13:00
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_FORMERR )
except socket . timeout :
# Windows chooses not to respond to incorrectly formatted queries.
# Although this appears to be non-deterministic even for the same
# request twice, it also appears to be based on a how poorly the
# request is formatted.
pass
2011-12-16 13:45:22 +01:00
2015-12-15 17:22:32 +13:00
def test_update_prereq_with_non_null_length ( self ) :
" test update with a non-null length "
p = self . make_name_packet ( dns . DNS_OPCODE_UPDATE )
updates = [ ]
name = self . get_dns_domain ( )
u = self . make_name_question ( name , dns . DNS_QTYPE_SOA , dns . DNS_QCLASS_IN )
updates . append ( u )
self . finish_name_packet ( p , updates )
prereqs = [ ]
r = dns . res_rec ( )
2016-01-29 17:28:54 +13:00
r . name = " %s . %s " % ( self . server , self . get_dns_domain ( ) )
2015-12-15 17:22:32 +13:00
r . rr_type = dns . DNS_QTYPE_TXT
r . rr_class = dns . DNS_QCLASS_ANY
r . ttl = 0
r . length = 1
prereqs . append ( r )
p . ancount = len ( prereqs )
p . answers = prereqs
2017-06-09 10:00:09 +12:00
( response , response_packet ) = self . dns_transaction_udp ( p , host = server_ip )
2016-01-21 10:25:44 +13:00
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_NXRRSET )
2011-12-16 13:45:22 +01:00
def test_update_prereq_nonexisting_name ( self ) :
2012-03-10 23:49:18 +01:00
" test update with a nonexisting name "
2011-12-16 13:45:22 +01:00
p = self . make_name_packet ( dns . DNS_OPCODE_UPDATE )
updates = [ ]
name = self . get_dns_domain ( )
u = self . make_name_question ( name , dns . DNS_QTYPE_SOA , dns . DNS_QCLASS_IN )
updates . append ( u )
self . finish_name_packet ( p , updates )
prereqs = [ ]
r = dns . res_rec ( )
r . name = " idontexist. %s " % self . get_dns_domain ( )
r . rr_type = dns . DNS_QTYPE_TXT
r . rr_class = dns . DNS_QCLASS_ANY
r . ttl = 0
r . length = 0
prereqs . append ( r )
p . ancount = len ( prereqs )
p . answers = prereqs
2017-06-09 10:00:09 +12:00
( response , response_packet ) = self . dns_transaction_udp ( p , host = server_ip )
2011-12-16 13:45:22 +01:00
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_NXRRSET )
2016-01-18 12:39:46 +13:00
def test_update_add_txt_record ( self ) :
" test adding records works "
prefix , txt = ' textrec ' , [ ' " This is a test " ' ]
p = self . make_txt_update ( prefix , txt )
2017-06-09 10:00:09 +12:00
( response , response_packet ) = self . dns_transaction_udp ( p , host = server_ip )
2016-01-18 12:39:46 +13:00
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
self . check_query_txt ( prefix , txt )
2012-03-10 23:48:44 +01:00
2012-03-11 00:25:57 +01:00
def test_delete_record ( self ) :
" Test if deleting records works "
2012-09-30 11:26:24 +02:00
NAME = " deleterec. %s " % self . get_dns_domain ( )
# First, create a record to make sure we have a record to delete.
2012-03-11 00:25:57 +01:00
p = self . make_name_packet ( dns . DNS_OPCODE_UPDATE )
updates = [ ]
name = self . get_dns_domain ( )
u = self . make_name_question ( name , dns . DNS_QTYPE_SOA , dns . DNS_QCLASS_IN )
updates . append ( u )
self . finish_name_packet ( p , updates )
updates = [ ]
r = dns . res_rec ( )
2012-09-30 11:26:24 +02:00
r . name = NAME
r . rr_type = dns . DNS_QTYPE_TXT
r . rr_class = dns . DNS_QCLASS_IN
r . ttl = 900
r . length = 0xffff
2017-05-31 13:57:25 +12:00
rdata = self . make_txt_record ( [ ' " This is a test " ' ] )
2013-05-16 12:13:22 +02:00
r . rdata = rdata
2012-09-30 11:26:24 +02:00
updates . append ( r )
p . nscount = len ( updates )
p . nsrecs = updates
2017-06-09 10:00:09 +12:00
( response , response_packet ) = self . dns_transaction_udp ( p , host = server_ip )
2012-09-30 11:26:24 +02:00
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
# Now check the record is around
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
q = self . make_name_question ( NAME , dns . DNS_QTYPE_TXT , dns . DNS_QCLASS_IN )
questions . append ( q )
self . finish_name_packet ( p , questions )
2017-06-09 10:00:09 +12:00
( response , response_packet ) = self . dns_transaction_udp ( p , host = server_ip )
2012-09-30 11:26:24 +02:00
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
# Now delete the record
p = self . make_name_packet ( dns . DNS_OPCODE_UPDATE )
updates = [ ]
name = self . get_dns_domain ( )
u = self . make_name_question ( name , dns . DNS_QTYPE_SOA , dns . DNS_QCLASS_IN )
updates . append ( u )
self . finish_name_packet ( p , updates )
updates = [ ]
r = dns . res_rec ( )
r . name = NAME
2012-03-11 00:25:57 +01:00
r . rr_type = dns . DNS_QTYPE_TXT
r . rr_class = dns . DNS_QCLASS_NONE
r . ttl = 0
r . length = 0xffff
2017-05-31 13:57:25 +12:00
rdata = self . make_txt_record ( [ ' " This is a test " ' ] )
2013-05-16 12:13:22 +02:00
r . rdata = rdata
2012-03-11 00:25:57 +01:00
updates . append ( r )
p . nscount = len ( updates )
p . nsrecs = updates
2017-06-09 10:00:09 +12:00
( response , response_packet ) = self . dns_transaction_udp ( p , host = server_ip )
2012-03-11 00:25:57 +01:00
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
2012-09-30 11:26:24 +02:00
# And finally check it's gone
2012-03-11 00:25:57 +01:00
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
2012-09-30 11:26:24 +02:00
q = self . make_name_question ( NAME , dns . DNS_QTYPE_TXT , dns . DNS_QCLASS_IN )
2012-03-11 00:25:57 +01:00
questions . append ( q )
self . finish_name_packet ( p , questions )
2017-06-09 10:00:09 +12:00
( response , response_packet ) = self . dns_transaction_udp ( p , host = server_ip )
2012-03-11 00:25:57 +01:00
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_NXDOMAIN )
2013-06-01 10:24:11 +02:00
def test_readd_record ( self ) :
" Test if adding, deleting and then readding a records works "
NAME = " readdrec. %s " % self . get_dns_domain ( )
# Create the record
p = self . make_name_packet ( dns . DNS_OPCODE_UPDATE )
updates = [ ]
name = self . get_dns_domain ( )
u = self . make_name_question ( name , dns . DNS_QTYPE_SOA , dns . DNS_QCLASS_IN )
updates . append ( u )
self . finish_name_packet ( p , updates )
updates = [ ]
r = dns . res_rec ( )
r . name = NAME
r . rr_type = dns . DNS_QTYPE_TXT
r . rr_class = dns . DNS_QCLASS_IN
r . ttl = 900
r . length = 0xffff
2017-05-31 13:57:25 +12:00
rdata = self . make_txt_record ( [ ' " This is a test " ' ] )
2013-06-01 10:24:11 +02:00
r . rdata = rdata
updates . append ( r )
p . nscount = len ( updates )
p . nsrecs = updates
2017-06-09 10:00:09 +12:00
( response , response_packet ) = self . dns_transaction_udp ( p , host = server_ip )
2013-06-01 10:24:11 +02:00
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
# Now check the record is around
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
q = self . make_name_question ( NAME , dns . DNS_QTYPE_TXT , dns . DNS_QCLASS_IN )
questions . append ( q )
self . finish_name_packet ( p , questions )
2017-06-09 10:00:09 +12:00
( response , response_packet ) = self . dns_transaction_udp ( p , host = server_ip )
2013-06-01 10:24:11 +02:00
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
# Now delete the record
p = self . make_name_packet ( dns . DNS_OPCODE_UPDATE )
updates = [ ]
name = self . get_dns_domain ( )
u = self . make_name_question ( name , dns . DNS_QTYPE_SOA , dns . DNS_QCLASS_IN )
updates . append ( u )
self . finish_name_packet ( p , updates )
updates = [ ]
r = dns . res_rec ( )
r . name = NAME
r . rr_type = dns . DNS_QTYPE_TXT
r . rr_class = dns . DNS_QCLASS_NONE
r . ttl = 0
r . length = 0xffff
2017-05-31 13:57:25 +12:00
rdata = self . make_txt_record ( [ ' " This is a test " ' ] )
2013-06-01 10:24:11 +02:00
r . rdata = rdata
updates . append ( r )
p . nscount = len ( updates )
p . nsrecs = updates
2017-06-09 10:00:09 +12:00
( response , response_packet ) = self . dns_transaction_udp ( p , host = server_ip )
2013-06-01 10:24:11 +02:00
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
# check it's gone
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
q = self . make_name_question ( NAME , dns . DNS_QTYPE_TXT , dns . DNS_QCLASS_IN )
questions . append ( q )
self . finish_name_packet ( p , questions )
2017-06-09 10:00:09 +12:00
( response , response_packet ) = self . dns_transaction_udp ( p , host = server_ip )
2013-06-01 10:24:11 +02:00
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_NXDOMAIN )
# recreate the record
p = self . make_name_packet ( dns . DNS_OPCODE_UPDATE )
updates = [ ]
name = self . get_dns_domain ( )
u = self . make_name_question ( name , dns . DNS_QTYPE_SOA , dns . DNS_QCLASS_IN )
updates . append ( u )
self . finish_name_packet ( p , updates )
updates = [ ]
r = dns . res_rec ( )
r . name = NAME
r . rr_type = dns . DNS_QTYPE_TXT
r . rr_class = dns . DNS_QCLASS_IN
r . ttl = 900
r . length = 0xffff
2017-05-31 13:57:25 +12:00
rdata = self . make_txt_record ( [ ' " This is a test " ' ] )
2013-06-01 10:24:11 +02:00
r . rdata = rdata
updates . append ( r )
p . nscount = len ( updates )
p . nsrecs = updates
2017-06-09 10:00:09 +12:00
( response , response_packet ) = self . dns_transaction_udp ( p , host = server_ip )
2013-06-01 10:24:11 +02:00
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
# Now check the record is around
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
q = self . make_name_question ( NAME , dns . DNS_QTYPE_TXT , dns . DNS_QCLASS_IN )
questions . append ( q )
self . finish_name_packet ( p , questions )
2017-06-09 10:00:09 +12:00
( response , response_packet ) = self . dns_transaction_udp ( p , host = server_ip )
2013-06-01 10:24:11 +02:00
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
2012-12-10 05:50:05 +10:00
def test_update_add_mx_record ( self ) :
" test adding MX records works "
p = self . make_name_packet ( dns . DNS_OPCODE_UPDATE )
updates = [ ]
name = self . get_dns_domain ( )
u = self . make_name_question ( name , dns . DNS_QTYPE_SOA , dns . DNS_QCLASS_IN )
updates . append ( u )
self . finish_name_packet ( p , updates )
updates = [ ]
r = dns . res_rec ( )
r . name = " %s " % self . get_dns_domain ( )
r . rr_type = dns . DNS_QTYPE_MX
r . rr_class = dns . DNS_QCLASS_IN
r . ttl = 900
r . length = 0xffff
rdata = dns . mx_record ( )
rdata . preference = 10
rdata . exchange = ' mail. %s ' % self . get_dns_domain ( )
r . rdata = rdata
updates . append ( r )
p . nscount = len ( updates )
p . nsrecs = updates
2017-06-09 10:00:09 +12:00
( response , response_packet ) = self . dns_transaction_udp ( p , host = server_ip )
2012-12-10 05:50:05 +10:00
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
name = " %s " % self . get_dns_domain ( )
q = self . make_name_question ( name , dns . DNS_QTYPE_MX , dns . DNS_QCLASS_IN )
questions . append ( q )
self . finish_name_packet ( p , questions )
2017-06-09 10:00:09 +12:00
( response , response_packet ) = self . dns_transaction_udp ( p , host = server_ip )
2012-12-10 05:50:05 +10:00
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
self . assertEqual ( response . ancount , 1 )
2013-01-14 00:56:48 +01:00
ans = response . answers [ 0 ]
self . assertEqual ( ans . rr_type , dns . DNS_QTYPE_MX )
self . assertEqual ( ans . rdata . preference , 10 )
self . assertEqual ( ans . rdata . exchange , ' mail. %s ' % self . get_dns_domain ( ) )
2012-12-10 05:50:05 +10:00
2012-03-11 00:25:57 +01:00
2012-06-01 08:05:54 +02:00
class TestComplexQueries ( DNSTest ) :
2016-04-06 15:44:58 +12:00
def make_dns_update ( self , key , value , qtype ) :
2012-06-01 08:05:54 +02:00
p = self . make_name_packet ( dns . DNS_OPCODE_UPDATE )
name = self . get_dns_domain ( )
u = self . make_name_question ( name , dns . DNS_QTYPE_SOA , dns . DNS_QCLASS_IN )
2016-04-06 15:44:58 +12:00
self . finish_name_packet ( p , [ u ] )
2012-06-01 08:05:54 +02:00
r = dns . res_rec ( )
2016-04-06 15:44:58 +12:00
r . name = key
r . rr_type = qtype
2012-06-01 08:05:54 +02:00
r . rr_class = dns . DNS_QCLASS_IN
r . ttl = 900
r . length = 0xffff
2018-05-09 18:02:28 +12:00
r . rdata = value
2016-04-06 15:44:58 +12:00
p . nscount = 1
2018-05-09 18:02:28 +12:00
p . nsrecs = [ r ]
2017-06-09 10:00:09 +12:00
( response , response_packet ) = self . dns_transaction_udp ( p , host = server_ip )
2012-06-01 08:05:54 +02:00
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
2016-04-06 15:44:58 +12:00
def setUp ( self ) :
super ( TestComplexQueries , self ) . setUp ( )
2017-06-09 10:00:09 +12:00
global server , server_ip , lp , creds , timeout
self . server = server_name
self . server_ip = server_ip
self . lp = lp
self . creds = creds
self . timeout = timeout
2012-06-01 08:05:54 +02:00
2017-06-08 16:20:42 +12:00
def test_one_a_query ( self ) :
" create a query packet containing one query record "
2012-06-01 08:05:54 +02:00
2017-06-08 16:20:42 +12:00
try :
2012-06-01 08:05:54 +02:00
2017-06-08 16:20:42 +12:00
# Create the record
name = " cname_test. %s " % self . get_dns_domain ( )
rdata = " %s . %s " % ( self . server , self . get_dns_domain ( ) )
self . make_dns_update ( name , rdata , dns . DNS_QTYPE_CNAME )
2012-06-01 08:05:54 +02:00
2017-06-08 16:20:42 +12:00
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
2012-06-01 08:05:54 +02:00
2017-06-08 16:20:42 +12:00
# Check the record
name = " cname_test. %s " % self . get_dns_domain ( )
q = self . make_name_question ( name , dns . DNS_QTYPE_A , dns . DNS_QCLASS_IN )
2018-03-09 13:38:42 +00:00
print ( " asking for " , q . name )
2017-06-08 16:20:42 +12:00
questions . append ( q )
2012-06-01 08:05:54 +02:00
2017-06-08 16:20:42 +12:00
self . finish_name_packet ( p , questions )
2017-06-09 10:00:09 +12:00
( response , response_packet ) = self . dns_transaction_udp ( p , host = self . server_ip )
2017-06-08 16:20:42 +12:00
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
self . assert_dns_opcode_equals ( response , dns . DNS_OPCODE_QUERY )
self . assertEquals ( response . ancount , 2 )
self . assertEquals ( response . answers [ 0 ] . rr_type , dns . DNS_QTYPE_CNAME )
self . assertEquals ( response . answers [ 0 ] . rdata , " %s . %s " %
( self . server , self . get_dns_domain ( ) ) )
self . assertEquals ( response . answers [ 1 ] . rr_type , dns . DNS_QTYPE_A )
self . assertEquals ( response . answers [ 1 ] . rdata ,
self . server_ip )
finally :
# Delete the record
p = self . make_name_packet ( dns . DNS_OPCODE_UPDATE )
updates = [ ]
name = self . get_dns_domain ( )
u = self . make_name_question ( name , dns . DNS_QTYPE_SOA , dns . DNS_QCLASS_IN )
updates . append ( u )
self . finish_name_packet ( p , updates )
updates = [ ]
r = dns . res_rec ( )
r . name = " cname_test. %s " % self . get_dns_domain ( )
r . rr_type = dns . DNS_QTYPE_CNAME
r . rr_class = dns . DNS_QCLASS_NONE
r . ttl = 0
r . length = 0xffff
r . rdata = " %s . %s " % ( self . server , self . get_dns_domain ( ) )
updates . append ( r )
p . nscount = len ( updates )
p . nsrecs = updates
2017-06-09 10:00:09 +12:00
( response , response_packet ) = self . dns_transaction_udp ( p , host = self . server_ip )
2017-06-08 16:20:42 +12:00
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
2012-06-01 08:05:54 +02:00
2016-04-06 15:44:58 +12:00
def test_cname_two_chain ( self ) :
name0 = " cnamechain0. %s " % self . get_dns_domain ( )
name1 = " cnamechain1. %s " % self . get_dns_domain ( )
name2 = " cnamechain2. %s " % self . get_dns_domain ( )
self . make_dns_update ( name1 , name2 , dns . DNS_QTYPE_CNAME )
self . make_dns_update ( name2 , name0 , dns . DNS_QTYPE_CNAME )
self . make_dns_update ( name0 , server_ip , dns . DNS_QTYPE_A )
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
q = self . make_name_question ( name1 , dns . DNS_QTYPE_A ,
dns . DNS_QCLASS_IN )
questions . append ( q )
self . finish_name_packet ( p , questions )
2017-06-09 10:00:09 +12:00
( response , response_packet ) = self . dns_transaction_udp ( p , host = server_ip )
2016-04-06 15:44:58 +12:00
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
self . assert_dns_opcode_equals ( response , dns . DNS_OPCODE_QUERY )
self . assertEquals ( response . ancount , 3 )
self . assertEquals ( response . answers [ 0 ] . rr_type , dns . DNS_QTYPE_CNAME )
self . assertEquals ( response . answers [ 0 ] . name , name1 )
self . assertEquals ( response . answers [ 0 ] . rdata , name2 )
self . assertEquals ( response . answers [ 1 ] . rr_type , dns . DNS_QTYPE_CNAME )
self . assertEquals ( response . answers [ 1 ] . name , name2 )
self . assertEquals ( response . answers [ 1 ] . rdata , name0 )
self . assertEquals ( response . answers [ 2 ] . rr_type , dns . DNS_QTYPE_A )
self . assertEquals ( response . answers [ 2 ] . rdata ,
self . server_ip )
2016-12-07 16:42:38 +13:00
def test_invalid_empty_cname ( self ) :
name0 = " cnamedotprefix0. %s " % self . get_dns_domain ( )
try :
self . make_dns_update ( name0 , " " , dns . DNS_QTYPE_CNAME )
except AssertionError as e :
pass
else :
self . fail ( " Successfully added empty CNAME, which is invalid. " )
2016-04-06 15:44:58 +12:00
def test_cname_two_chain_not_matching_qtype ( self ) :
name0 = " cnamechain0. %s " % self . get_dns_domain ( )
name1 = " cnamechain1. %s " % self . get_dns_domain ( )
name2 = " cnamechain2. %s " % self . get_dns_domain ( )
self . make_dns_update ( name1 , name2 , dns . DNS_QTYPE_CNAME )
self . make_dns_update ( name2 , name0 , dns . DNS_QTYPE_CNAME )
self . make_dns_update ( name0 , server_ip , dns . DNS_QTYPE_A )
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
q = self . make_name_question ( name1 , dns . DNS_QTYPE_TXT ,
dns . DNS_QCLASS_IN )
questions . append ( q )
self . finish_name_packet ( p , questions )
2017-06-09 10:00:09 +12:00
( response , response_packet ) = self . dns_transaction_udp ( p , host = server_ip )
2016-04-06 15:44:58 +12:00
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
self . assert_dns_opcode_equals ( response , dns . DNS_OPCODE_QUERY )
# CNAME should return all intermediate results!
# Only the A records exists, not the TXT.
self . assertEquals ( response . ancount , 2 )
self . assertEquals ( response . answers [ 0 ] . rr_type , dns . DNS_QTYPE_CNAME )
self . assertEquals ( response . answers [ 0 ] . name , name1 )
self . assertEquals ( response . answers [ 0 ] . rdata , name2 )
self . assertEquals ( response . answers [ 1 ] . rr_type , dns . DNS_QTYPE_CNAME )
self . assertEquals ( response . answers [ 1 ] . name , name2 )
2016-12-07 14:25:35 +13:00
self . assertEquals ( response . answers [ 1 ] . rdata , name0 )
2016-04-06 15:44:58 +12:00
2012-09-21 23:06:13 +02:00
class TestInvalidQueries ( DNSTest ) :
2017-06-09 10:00:09 +12:00
def setUp ( self ) :
super ( TestInvalidQueries , self ) . setUp ( )
global server , server_ip , lp , creds , timeout
self . server = server_name
self . server_ip = server_ip
self . lp = lp
self . creds = creds
self . timeout = timeout
2012-09-21 23:06:13 +02:00
def test_one_a_query ( self ) :
" send 0 bytes follows by create a query packet containing one query record "
s = None
try :
s = socket . socket ( socket . AF_INET , socket . SOCK_DGRAM , 0 )
2016-01-29 17:28:54 +13:00
s . connect ( ( self . server_ip , 53 ) )
2012-09-21 23:06:13 +02:00
s . send ( " " , 0 )
finally :
if s is not None :
s . close ( )
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
2016-01-29 17:28:54 +13:00
name = " %s . %s " % ( self . server , self . get_dns_domain ( ) )
2012-09-21 23:06:13 +02:00
q = self . make_name_question ( name , dns . DNS_QTYPE_A , dns . DNS_QCLASS_IN )
2018-03-09 13:38:42 +00:00
print ( " asking for " , q . name )
2012-09-21 23:06:13 +02:00
questions . append ( q )
self . finish_name_packet ( p , questions )
2017-06-09 10:00:09 +12:00
( response , response_packet ) = self . dns_transaction_udp ( p , host = self . server_ip )
2012-09-21 23:06:13 +02:00
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
self . assert_dns_opcode_equals ( response , dns . DNS_OPCODE_QUERY )
self . assertEquals ( response . ancount , 1 )
self . assertEquals ( response . answers [ 0 ] . rdata ,
2016-01-29 17:28:54 +13:00
self . server_ip )
2012-06-01 08:05:54 +02:00
2014-05-13 08:13:29 +02:00
def test_one_a_reply ( self ) :
" send a reply instead of a query "
2016-01-21 15:43:55 +13:00
global timeout
2014-05-13 08:13:29 +02:00
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
name = " %s . %s " % ( ' fakefakefake ' , self . get_dns_domain ( ) )
q = self . make_name_question ( name , dns . DNS_QTYPE_A , dns . DNS_QCLASS_IN )
2018-03-09 13:38:42 +00:00
print ( " asking for " , q . name )
2014-05-13 08:13:29 +02:00
questions . append ( q )
self . finish_name_packet ( p , questions )
p . operation | = dns . DNS_FLAG_REPLY
s = None
try :
send_packet = ndr . ndr_pack ( p )
s = socket . socket ( socket . AF_INET , socket . SOCK_STREAM , 0 )
2016-01-21 15:43:55 +13:00
s . settimeout ( timeout )
2016-01-29 17:28:54 +13:00
host = self . server_ip
2014-05-13 08:13:29 +02:00
s . connect ( ( host , 53 ) )
tcp_packet = struct . pack ( ' !H ' , len ( send_packet ) )
tcp_packet + = send_packet
s . send ( tcp_packet , 0 )
recv_packet = s . recv ( 0xffff + 2 , 0 )
self . assertEquals ( 0 , len ( recv_packet ) )
2016-01-21 15:43:55 +13:00
except socket . timeout :
# Windows chooses not to respond to incorrectly formatted queries.
# Although this appears to be non-deterministic even for the same
# request twice, it also appears to be based on a how poorly the
# request is formatted.
pass
2014-05-13 08:13:29 +02:00
finally :
if s is not None :
s . close ( )
2014-12-16 18:04:13 +01:00
class TestZones ( DNSTest ) :
def setUp ( self ) :
super ( TestZones , self ) . setUp ( )
2017-06-09 10:00:09 +12:00
global server , server_ip , lp , creds , timeout
self . server = server_name
self . server_ip = server_ip
self . lp = lp
self . creds = creds
self . timeout = timeout
2014-12-16 18:04:13 +01:00
self . zone = " test.lan "
2018-05-09 18:02:28 +12:00
self . rpc_conn = dnsserver . dnsserver ( " ncacn_ip_tcp: %s [sign] " % \
( self . server_ip ) ,
2014-12-16 18:04:13 +01:00
self . lp , self . creds )
2018-05-09 18:02:28 +12:00
self . samdb = SamDB ( url = " ldap:// " + self . server_ip ,
lp = self . get_loadparm ( ) ,
session_info = system_session ( ) ,
credentials = self . creds )
self . zone_dn = " DC= " + self . zone + \
" ,CN=MicrosoftDNS,DC=DomainDNSZones, " + \
str ( self . samdb . get_default_basedn ( ) )
2014-12-19 15:14:22 +13:00
def tearDown ( self ) :
super ( TestZones , self ) . tearDown ( )
2018-05-09 18:02:28 +12:00
2014-12-19 15:14:22 +13:00
try :
self . delete_zone ( self . zone )
2018-02-23 14:32:17 +00:00
except RuntimeError as e :
( num , string ) = e . args
2017-02-13 11:12:54 +13:00
if num != werror . WERR_DNS_ERROR_ZONE_DOES_NOT_EXIST :
2014-12-19 15:14:22 +13:00
raise
2018-05-09 18:02:28 +12:00
def create_zone ( self , zone , aging_enabled = False ) :
2014-12-16 18:04:13 +01:00
zone_create = dnsserver . DNS_RPC_ZONE_CREATE_INFO_LONGHORN ( )
zone_create . pszZoneName = zone
zone_create . dwZoneType = dnsp . DNS_ZONE_TYPE_PRIMARY
2018-05-09 18:02:28 +12:00
zone_create . fAging = int ( aging_enabled )
2014-12-16 18:04:13 +01:00
zone_create . dwDpFlags = dnsserver . DNS_DP_DOMAIN_DEFAULT
2018-05-09 18:02:28 +12:00
zone_create . fDsIntegrated = 1
zone_create . fLoadExisting = 1
zone_create . fAllowUpdate = dnsp . DNS_ZONE_UPDATE_UNSECURE
2017-06-08 16:20:42 +12:00
try :
2018-05-09 18:02:28 +12:00
client_version = dnsserver . DNS_CLIENT_VERSION_LONGHORN
self . rpc_conn . DnssrvOperation2 ( client_version ,
2017-06-08 16:20:42 +12:00
0 ,
self . server_ip ,
None ,
0 ,
' ZoneCreate ' ,
dnsserver . DNSSRV_TYPEID_ZONE_CREATE ,
zone_create )
except WERRORError as e :
self . fail ( str ( e ) )
2014-12-16 18:04:13 +01:00
2018-05-09 18:02:28 +12:00
def set_params ( self , * * kwargs ) :
zone = kwargs . pop ( ' zone ' , None )
for key , val in kwargs . items ( ) :
name_param = dnsserver . DNS_RPC_NAME_AND_PARAM ( )
name_param . dwParam = val
name_param . pszNodeName = key
client_version = dnsserver . DNS_CLIENT_VERSION_LONGHORN
nap_type = dnsserver . DNSSRV_TYPEID_NAME_AND_PARAM
try :
self . rpc_conn . DnssrvOperation2 ( client_version , 0 , self . server , zone ,
0 , ' ResetDwordProperty ' , nap_type ,
name_param )
except WERRORError as e :
self . fail ( str ( e ) )
def ldap_modify_dnsrecs ( self , name , func ) :
dn = ' DC= {} , {} ' . format ( name , self . zone_dn )
dns_recs = self . ldap_get_dns_records ( name )
for rec in dns_recs :
func ( rec )
update_dict = { ' dn ' : dn , ' dnsRecord ' : [ ndr_pack ( r ) for r in dns_recs ] }
self . samdb . modify ( ldb . Message . from_dict ( self . samdb ,
update_dict ,
ldb . FLAG_MOD_REPLACE ) )
def dns_update_record ( self , prefix , txt ) :
p = self . make_txt_update ( prefix , txt , self . zone )
( code , response ) = self . dns_transaction_udp ( p , host = self . server_ip )
self . assert_dns_rcode_equals ( code , dns . DNS_RCODE_OK )
recs = self . ldap_get_dns_records ( prefix )
recs = [ r for r in recs if r . data . str == txt ]
self . assertEqual ( len ( recs ) , 1 )
return recs [ 0 ]
def ldap_get_records ( self , name ) :
dn = ' DC= {} , {} ' . format ( name , self . zone_dn )
expr = " (&(objectClass=dnsNode)(name= {} )) " . format ( name )
return self . samdb . search ( base = dn , scope = ldb . SCOPE_SUBTREE ,
expression = expr , attrs = [ " * " ] )
def ldap_get_dns_records ( self , name ) :
records = self . ldap_get_records ( name )
return [ ndr_unpack ( dnsp . DnssrvRpcRecord , r )
for r in records [ 0 ] . get ( ' dnsRecord ' ) ]
def ldap_get_zone_settings ( self ) :
records = self . samdb . search ( base = self . zone_dn , scope = ldb . SCOPE_BASE ,
expression = " (&(objectClass=dnsZone) " + \
" (name= {} )) " . format ( self . zone ) ,
attrs = [ " dNSProperty " ] )
self . assertEqual ( len ( records ) , 1 )
props = [ ndr_unpack ( dnsp . DnsProperty , r )
for r in records [ 0 ] . get ( ' dNSProperty ' ) ]
#We have no choice but to repeat these here.
zone_prop_ids = { 0x00 : " EMPTY " ,
0x01 : " TYPE " ,
0x02 : " ALLOW_UPDATE " ,
0x08 : " SECURE_TIME " ,
0x10 : " NOREFRESH_INTERVAL " ,
0x11 : " SCAVENGING_SERVERS " ,
0x12 : " AGING_ENABLED_TIME " ,
0x20 : " REFRESH_INTERVAL " ,
0x40 : " AGING_STATE " ,
0x80 : " DELETED_FROM_HOSTNAME " ,
0x81 : " MASTER_SERVERS " ,
0x82 : " AUTO_NS_SERVERS " ,
0x83 : " DCPROMO_CONVERT " ,
0x90 : " SCAVENGING_SERVERS_DA " ,
0x91 : " MASTER_SERVERS_DA " ,
0x92 : " NS_SERVERS_DA " ,
0x100 : " NODE_DBFLAGS " }
return { zone_prop_ids [ p . id ] . lower ( ) : p . data for p in props }
def set_aging ( self , enable = False ) :
self . create_zone ( self . zone , aging_enabled = enable )
self . set_params ( NoRefreshInterval = 1 , RefreshInterval = 1 ,
Aging = int ( bool ( enable ) ) , zone = self . zone ,
AllowUpdate = dnsp . DNS_ZONE_UPDATE_UNSECURE )
def test_set_aging ( self , enable = True , name = ' agingtest ' , txt = [ ' test txt ' ] ) :
self . set_aging ( enable = True )
settings = self . ldap_get_zone_settings ( )
self . assertTrue ( settings [ ' aging_state ' ] is not None )
self . assertTrue ( settings [ ' aging_state ' ] )
rec = self . dns_update_record ( ' agingtest ' , [ ' test txt ' ] )
self . assertNotEqual ( rec . dwTimeStamp , 0 )
def test_set_aging_disabled ( self ) :
self . set_aging ( enable = False )
settings = self . ldap_get_zone_settings ( )
self . assertTrue ( settings [ ' aging_state ' ] is not None )
self . assertFalse ( settings [ ' aging_state ' ] )
rec = self . dns_update_record ( ' agingtest ' , [ ' test txt ' ] )
self . assertNotEqual ( rec . dwTimeStamp , 0 )
def test_aging_update ( self , enable = True ) :
name , txt = ' agingtest ' , [ ' test txt ' ]
self . set_aging ( enable = True )
before_mod = self . dns_update_record ( name , txt )
if not enable :
self . set_params ( zone = self . zone , Aging = 0 )
dec = 2
def mod_ts ( rec ) :
2018-07-03 15:34:32 +12:00
self . assertTrue ( rec . dwTimeStamp > 0 )
2018-05-09 18:02:28 +12:00
rec . dwTimeStamp - = dec
self . ldap_modify_dnsrecs ( name , mod_ts )
after_mod = self . ldap_get_dns_records ( name )
self . assertEqual ( len ( after_mod ) , 1 )
after_mod = after_mod [ 0 ]
self . assertEqual ( after_mod . dwTimeStamp ,
before_mod . dwTimeStamp - dec )
after_update = self . dns_update_record ( name , txt )
after_should_equal = before_mod if enable else after_mod
self . assertEqual ( after_should_equal . dwTimeStamp ,
after_update . dwTimeStamp )
def test_aging_update_disabled ( self ) :
self . test_aging_update ( enable = False )
def test_aging_refresh ( self ) :
name , txt = ' agingtest ' , [ ' test txt ' ]
self . create_zone ( self . zone , aging_enabled = True )
interval = 10
self . set_params ( NoRefreshInterval = interval , RefreshInterval = interval ,
Aging = 1 , zone = self . zone ,
AllowUpdate = dnsp . DNS_ZONE_UPDATE_UNSECURE )
before_mod = self . dns_update_record ( name , txt )
def mod_ts ( rec ) :
2018-07-03 15:34:32 +12:00
self . assertTrue ( rec . dwTimeStamp > 0 )
2018-05-09 18:02:28 +12:00
rec . dwTimeStamp - = interval / 2
self . ldap_modify_dnsrecs ( name , mod_ts )
update_during_norefresh = self . dns_update_record ( name , txt )
def mod_ts ( rec ) :
2018-07-03 15:34:32 +12:00
self . assertTrue ( rec . dwTimeStamp > 0 )
2018-05-09 18:02:28 +12:00
rec . dwTimeStamp - = interval + interval / 2
self . ldap_modify_dnsrecs ( name , mod_ts )
update_during_refresh = self . dns_update_record ( name , txt )
self . assertEqual ( update_during_norefresh . dwTimeStamp ,
before_mod . dwTimeStamp - interval / 2 )
self . assertEqual ( update_during_refresh . dwTimeStamp ,
before_mod . dwTimeStamp )
def test_rpc_add_no_timestamp ( self ) :
name , txt = ' agingtest ' , [ ' test txt ' ]
self . set_aging ( enable = True )
rec_buf = dnsserver . DNS_RPC_RECORD_BUF ( )
rec_buf . rec = TXTRecord ( txt )
self . rpc_conn . DnssrvUpdateRecord2 ( dnsserver . DNS_CLIENT_VERSION_LONGHORN ,
0 , self . server_ip ,
self . zone , name , rec_buf , None )
recs = self . ldap_get_dns_records ( name )
self . assertEqual ( len ( recs ) , 1 )
self . assertEqual ( recs [ 0 ] . dwTimeStamp , 0 )
def test_basic_scavenging ( self ) :
self . create_zone ( self . zone , aging_enabled = True )
interval = 1
self . set_params ( NoRefreshInterval = interval , RefreshInterval = interval ,
zone = self . zone , Aging = 1 ,
AllowUpdate = dnsp . DNS_ZONE_UPDATE_UNSECURE )
name , txt = ' agingtest ' , [ ' test txt ' ]
rec = self . dns_update_record ( name , txt )
rec = self . dns_update_record ( name + ' 2 ' , txt )
def mod_ts ( rec ) :
2018-07-03 15:34:32 +12:00
self . assertTrue ( rec . dwTimeStamp > 0 )
2018-05-09 18:02:28 +12:00
rec . dwTimeStamp - = interval * 5
self . ldap_modify_dnsrecs ( name , mod_ts )
2018-07-02 13:43:33 +12:00
self . assertTrue ( callable ( getattr ( dsdb , ' _scavenge_dns_records ' , None ) ) )
2018-05-09 18:02:28 +12:00
dsdb . _scavenge_dns_records ( self . samdb )
recs = self . ldap_get_dns_records ( name )
self . assertEqual ( len ( recs ) , 1 )
self . assertEqual ( recs [ 0 ] . wType , dnsp . DNS_TYPE_TOMBSTONE )
recs = self . ldap_get_dns_records ( name + ' 2 ' )
self . assertEqual ( len ( recs ) , 1 )
self . assertEqual ( recs [ 0 ] . wType , dnsp . DNS_TYPE_TXT )
2014-12-16 18:04:13 +01:00
def delete_zone ( self , zone ) :
self . rpc_conn . DnssrvOperation2 ( dnsserver . DNS_CLIENT_VERSION_LONGHORN ,
0 ,
2016-01-28 12:54:58 +13:00
self . server_ip ,
2014-12-16 18:04:13 +01:00
zone ,
0 ,
' DeleteZoneFromDs ' ,
dnsserver . DNSSRV_TYPEID_NULL ,
None )
def test_soa_query ( self ) :
zone = " test.lan "
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
q = self . make_name_question ( zone , dns . DNS_QTYPE_SOA , dns . DNS_QCLASS_IN )
questions . append ( q )
self . finish_name_packet ( p , questions )
2017-06-09 10:00:09 +12:00
( response , response_packet ) = self . dns_transaction_udp ( p , host = server_ip )
2016-01-21 17:08:18 +13:00
# Windows returns OK while BIND logically seems to return NXDOMAIN
2014-12-16 18:04:13 +01:00
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_NXDOMAIN )
self . assert_dns_opcode_equals ( response , dns . DNS_OPCODE_QUERY )
self . assertEquals ( response . ancount , 0 )
self . create_zone ( zone )
2017-06-09 10:00:09 +12:00
( response , response_packet ) = self . dns_transaction_udp ( p , host = server_ip )
2014-12-16 18:04:13 +01:00
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
self . assert_dns_opcode_equals ( response , dns . DNS_OPCODE_QUERY )
self . assertEquals ( response . ancount , 1 )
self . assertEquals ( response . answers [ 0 ] . rr_type , dns . DNS_QTYPE_SOA )
self . delete_zone ( zone )
2017-06-09 10:00:09 +12:00
( response , response_packet ) = self . dns_transaction_udp ( p , host = server_ip )
2014-12-16 18:04:13 +01:00
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_NXDOMAIN )
self . assert_dns_opcode_equals ( response , dns . DNS_OPCODE_QUERY )
self . assertEquals ( response . ancount , 0 )
2016-01-27 17:41:44 +13:00
class TestRPCRoundtrip ( DNSTest ) :
def setUp ( self ) :
super ( TestRPCRoundtrip , self ) . setUp ( )
2017-06-09 10:00:09 +12:00
global server , server_ip , lp , creds
self . server = server_name
self . server_ip = server_ip
self . lp = lp
self . creds = creds
2016-01-28 12:54:58 +13:00
self . rpc_conn = dnsserver . dnsserver ( " ncacn_ip_tcp: %s [sign] " % ( self . server_ip ) ,
2016-01-27 17:41:44 +13:00
self . lp , self . creds )
def tearDown ( self ) :
super ( TestRPCRoundtrip , self ) . tearDown ( )
2016-01-28 12:54:58 +13:00
def test_update_add_txt_rpc_to_dns ( self ) :
prefix , txt = ' rpctextrec ' , [ ' " This is a test " ' ]
name = " %s . %s " % ( prefix , self . get_dns_domain ( ) )
rec = data_to_dns_record ( dnsp . DNS_TYPE_TXT , ' " \\ " This is a test \\ " " ' )
add_rec_buf = dnsserver . DNS_RPC_RECORD_BUF ( )
add_rec_buf . rec = rec
try :
self . rpc_conn . DnssrvUpdateRecord2 ( dnsserver . DNS_CLIENT_VERSION_LONGHORN ,
0 , self . server_ip , self . get_dns_domain ( ) ,
name , add_rec_buf , None )
2017-06-08 16:20:42 +12:00
except WERRORError as e :
self . fail ( str ( e ) )
2016-01-28 12:54:58 +13:00
2017-06-08 16:20:42 +12:00
try :
2016-01-28 12:54:58 +13:00
self . check_query_txt ( prefix , txt )
finally :
self . rpc_conn . DnssrvUpdateRecord2 ( dnsserver . DNS_CLIENT_VERSION_LONGHORN ,
0 , self . server_ip , self . get_dns_domain ( ) ,
name , None , add_rec_buf )
2016-01-27 17:41:44 +13:00
def test_update_add_null_padded_txt_record ( self ) :
" test adding records works "
prefix , txt = ' pad1textrec ' , [ ' " This is a test " ' , ' ' , ' ' ]
p = self . make_txt_update ( prefix , txt )
2017-06-09 10:00:09 +12:00
( response , response_packet ) = self . dns_transaction_udp ( p , host = server_ip )
2016-01-27 17:41:44 +13:00
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
self . check_query_txt ( prefix , txt )
2016-01-28 12:54:58 +13:00
self . assertIsNotNone ( dns_record_match ( self . rpc_conn , self . server_ip ,
2016-01-27 17:41:44 +13:00
self . get_dns_domain ( ) ,
" %s . %s " % ( prefix , self . get_dns_domain ( ) ) ,
dnsp . DNS_TYPE_TXT , ' " \\ " This is a test \\ " " " " " " ' ) )
2014-12-16 18:04:13 +01:00
2016-01-27 17:41:44 +13:00
prefix , txt = ' pad2textrec ' , [ ' " This is a test " ' , ' ' , ' ' , ' more text ' ]
p = self . make_txt_update ( prefix , txt )
2017-06-09 10:00:09 +12:00
( response , response_packet ) = self . dns_transaction_udp ( p , host = server_ip )
2016-01-27 17:41:44 +13:00
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
self . check_query_txt ( prefix , txt )
2016-01-28 12:54:58 +13:00
self . assertIsNotNone ( dns_record_match ( self . rpc_conn , self . server_ip ,
2016-01-27 17:41:44 +13:00
self . get_dns_domain ( ) ,
" %s . %s " % ( prefix , self . get_dns_domain ( ) ) ,
dnsp . DNS_TYPE_TXT , ' " \\ " This is a test \\ " " " " " " " more text " ' ) )
prefix , txt = ' pad3textrec ' , [ ' ' , ' ' , ' " This is a test " ' ]
p = self . make_txt_update ( prefix , txt )
2017-06-09 10:00:09 +12:00
( response , response_packet ) = self . dns_transaction_udp ( p , host = server_ip )
2016-01-27 17:41:44 +13:00
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
self . check_query_txt ( prefix , txt )
2016-01-28 12:54:58 +13:00
self . assertIsNotNone ( dns_record_match ( self . rpc_conn , self . server_ip ,
2016-01-27 17:41:44 +13:00
self . get_dns_domain ( ) ,
" %s . %s " % ( prefix , self . get_dns_domain ( ) ) ,
dnsp . DNS_TYPE_TXT , ' " " " " " \\ " This is a test \\ " " ' ) )
2016-01-28 12:54:58 +13:00
def test_update_add_padding_rpc_to_dns ( self ) :
prefix , txt = ' pad1textrec ' , [ ' " This is a test " ' , ' ' , ' ' ]
prefix = ' rpc ' + prefix
name = " %s . %s " % ( prefix , self . get_dns_domain ( ) )
rec = data_to_dns_record ( dnsp . DNS_TYPE_TXT , ' " \\ " This is a test \\ " " " " " " ' )
add_rec_buf = dnsserver . DNS_RPC_RECORD_BUF ( )
add_rec_buf . rec = rec
try :
self . rpc_conn . DnssrvUpdateRecord2 ( dnsserver . DNS_CLIENT_VERSION_LONGHORN ,
0 , self . server_ip , self . get_dns_domain ( ) ,
name , add_rec_buf , None )
2017-06-08 16:20:42 +12:00
except WERRORError as e :
self . fail ( str ( e ) )
try :
2016-01-28 12:54:58 +13:00
self . check_query_txt ( prefix , txt )
finally :
self . rpc_conn . DnssrvUpdateRecord2 ( dnsserver . DNS_CLIENT_VERSION_LONGHORN ,
0 , self . server_ip , self . get_dns_domain ( ) ,
name , None , add_rec_buf )
prefix , txt = ' pad2textrec ' , [ ' " This is a test " ' , ' ' , ' ' , ' more text ' ]
prefix = ' rpc ' + prefix
name = " %s . %s " % ( prefix , self . get_dns_domain ( ) )
rec = data_to_dns_record ( dnsp . DNS_TYPE_TXT , ' " \\ " This is a test \\ " " " " " " " more text " ' )
add_rec_buf = dnsserver . DNS_RPC_RECORD_BUF ( )
add_rec_buf . rec = rec
try :
self . rpc_conn . DnssrvUpdateRecord2 ( dnsserver . DNS_CLIENT_VERSION_LONGHORN ,
0 , self . server_ip , self . get_dns_domain ( ) ,
name , add_rec_buf , None )
2017-06-08 16:20:42 +12:00
except WERRORError as e :
self . fail ( str ( e ) )
try :
2016-01-28 12:54:58 +13:00
self . check_query_txt ( prefix , txt )
finally :
self . rpc_conn . DnssrvUpdateRecord2 ( dnsserver . DNS_CLIENT_VERSION_LONGHORN ,
0 , self . server_ip , self . get_dns_domain ( ) ,
name , None , add_rec_buf )
prefix , txt = ' pad3textrec ' , [ ' ' , ' ' , ' " This is a test " ' ]
prefix = ' rpc ' + prefix
name = " %s . %s " % ( prefix , self . get_dns_domain ( ) )
rec = data_to_dns_record ( dnsp . DNS_TYPE_TXT , ' " " " " " \\ " This is a test \\ " " ' )
add_rec_buf = dnsserver . DNS_RPC_RECORD_BUF ( )
add_rec_buf . rec = rec
try :
self . rpc_conn . DnssrvUpdateRecord2 ( dnsserver . DNS_CLIENT_VERSION_LONGHORN ,
0 , self . server_ip , self . get_dns_domain ( ) ,
name , add_rec_buf , None )
2017-06-08 16:20:42 +12:00
except WERRORError as e :
self . fail ( str ( e ) )
2016-01-28 12:54:58 +13:00
2017-06-08 16:20:42 +12:00
try :
2016-01-28 12:54:58 +13:00
self . check_query_txt ( prefix , txt )
finally :
self . rpc_conn . DnssrvUpdateRecord2 ( dnsserver . DNS_CLIENT_VERSION_LONGHORN ,
0 , self . server_ip , self . get_dns_domain ( ) ,
name , None , add_rec_buf )
2016-01-27 17:41:44 +13:00
# Test is incomplete due to strlen against txt records
def test_update_add_null_char_txt_record ( self ) :
" test adding records works "
prefix , txt = ' nulltextrec ' , [ ' NULL \x00 BYTE ' ]
p = self . make_txt_update ( prefix , txt )
2017-06-09 10:00:09 +12:00
( response , response_packet ) = self . dns_transaction_udp ( p , host = server_ip )
2016-01-27 17:41:44 +13:00
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
self . check_query_txt ( prefix , [ ' NULL ' ] )
2016-01-28 12:54:58 +13:00
self . assertIsNotNone ( dns_record_match ( self . rpc_conn , self . server_ip ,
2016-01-27 17:41:44 +13:00
self . get_dns_domain ( ) ,
" %s . %s " % ( prefix , self . get_dns_domain ( ) ) ,
dnsp . DNS_TYPE_TXT , ' " NULL " ' ) )
prefix , txt = ' nulltextrec2 ' , [ ' NULL \x00 BYTE ' , ' NULL \x00 BYTE ' ]
p = self . make_txt_update ( prefix , txt )
2017-06-09 10:00:09 +12:00
( response , response_packet ) = self . dns_transaction_udp ( p , host = server_ip )
2016-01-27 17:41:44 +13:00
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
self . check_query_txt ( prefix , [ ' NULL ' , ' NULL ' ] )
2016-01-28 12:54:58 +13:00
self . assertIsNotNone ( dns_record_match ( self . rpc_conn , self . server_ip ,
2016-01-27 17:41:44 +13:00
self . get_dns_domain ( ) ,
" %s . %s " % ( prefix , self . get_dns_domain ( ) ) ,
dnsp . DNS_TYPE_TXT , ' " NULL " " NULL " ' ) )
2016-01-28 12:54:58 +13:00
def test_update_add_null_char_rpc_to_dns ( self ) :
prefix , txt = ' nulltextrec ' , [ ' NULL \x00 BYTE ' ]
prefix = ' rpc ' + prefix
name = " %s . %s " % ( prefix , self . get_dns_domain ( ) )
rec = data_to_dns_record ( dnsp . DNS_TYPE_TXT , ' " NULL " ' )
add_rec_buf = dnsserver . DNS_RPC_RECORD_BUF ( )
add_rec_buf . rec = rec
try :
self . rpc_conn . DnssrvUpdateRecord2 ( dnsserver . DNS_CLIENT_VERSION_LONGHORN ,
0 , self . server_ip , self . get_dns_domain ( ) ,
name , add_rec_buf , None )
2017-06-08 16:20:42 +12:00
except WERRORError as e :
self . fail ( str ( e ) )
try :
self . check_query_txt ( prefix , [ ' NULL ' ] )
2016-01-28 12:54:58 +13:00
finally :
self . rpc_conn . DnssrvUpdateRecord2 ( dnsserver . DNS_CLIENT_VERSION_LONGHORN ,
0 , self . server_ip , self . get_dns_domain ( ) ,
name , None , add_rec_buf )
2016-01-27 17:41:44 +13:00
def test_update_add_hex_char_txt_record ( self ) :
" test adding records works "
prefix , txt = ' hextextrec ' , [ ' HIGH \xFF BYTE ' ]
p = self . make_txt_update ( prefix , txt )
2017-06-09 10:00:09 +12:00
( response , response_packet ) = self . dns_transaction_udp ( p , host = server_ip )
2016-01-27 17:41:44 +13:00
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
self . check_query_txt ( prefix , txt )
2016-01-28 12:54:58 +13:00
self . assertIsNotNone ( dns_record_match ( self . rpc_conn , self . server_ip ,
2016-01-27 17:41:44 +13:00
self . get_dns_domain ( ) ,
" %s . %s " % ( prefix , self . get_dns_domain ( ) ) ,
dnsp . DNS_TYPE_TXT , ' " HIGH \xFF BYTE " ' ) )
2016-01-28 12:54:58 +13:00
def test_update_add_hex_rpc_to_dns ( self ) :
prefix , txt = ' hextextrec ' , [ ' HIGH \xFF BYTE ' ]
prefix = ' rpc ' + prefix
name = " %s . %s " % ( prefix , self . get_dns_domain ( ) )
rec = data_to_dns_record ( dnsp . DNS_TYPE_TXT , ' " HIGH \xFF BYTE " ' )
add_rec_buf = dnsserver . DNS_RPC_RECORD_BUF ( )
add_rec_buf . rec = rec
try :
self . rpc_conn . DnssrvUpdateRecord2 ( dnsserver . DNS_CLIENT_VERSION_LONGHORN ,
0 , self . server_ip , self . get_dns_domain ( ) ,
name , add_rec_buf , None )
2017-06-08 16:20:42 +12:00
except WERRORError as e :
self . fail ( str ( e ) )
try :
self . check_query_txt ( prefix , txt )
2016-01-28 12:54:58 +13:00
finally :
self . rpc_conn . DnssrvUpdateRecord2 ( dnsserver . DNS_CLIENT_VERSION_LONGHORN ,
0 , self . server_ip , self . get_dns_domain ( ) ,
name , None , add_rec_buf )
2016-01-27 17:41:44 +13:00
def test_update_add_slash_txt_record ( self ) :
" test adding records works "
prefix , txt = ' slashtextrec ' , [ ' Th \\ =is=is a test ' ]
p = self . make_txt_update ( prefix , txt )
2017-06-09 10:00:09 +12:00
( response , response_packet ) = self . dns_transaction_udp ( p , host = server_ip )
2016-01-27 17:41:44 +13:00
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
self . check_query_txt ( prefix , txt )
2016-01-28 12:54:58 +13:00
self . assertIsNotNone ( dns_record_match ( self . rpc_conn , self . server_ip ,
2016-01-27 17:41:44 +13:00
self . get_dns_domain ( ) ,
" %s . %s " % ( prefix , self . get_dns_domain ( ) ) ,
dnsp . DNS_TYPE_TXT , ' " Th \\ \\ =is=is a test " ' ) )
2016-01-28 12:54:58 +13:00
# This test fails against Windows as it eliminates slashes in RPC
# One typical use for a slash is in records like 'var=value' to
# escape '=' characters.
def test_update_add_slash_rpc_to_dns ( self ) :
prefix , txt = ' slashtextrec ' , [ ' Th \\ =is=is a test ' ]
prefix = ' rpc ' + prefix
name = " %s . %s " % ( prefix , self . get_dns_domain ( ) )
rec = data_to_dns_record ( dnsp . DNS_TYPE_TXT , ' " Th \\ \\ =is=is a test " ' )
add_rec_buf = dnsserver . DNS_RPC_RECORD_BUF ( )
add_rec_buf . rec = rec
try :
self . rpc_conn . DnssrvUpdateRecord2 ( dnsserver . DNS_CLIENT_VERSION_LONGHORN ,
0 , self . server_ip , self . get_dns_domain ( ) ,
name , add_rec_buf , None )
2017-06-08 16:20:42 +12:00
except WERRORError as e :
self . fail ( str ( e ) )
try :
2016-01-28 12:54:58 +13:00
self . check_query_txt ( prefix , txt )
2017-06-08 16:20:42 +12:00
2016-01-28 12:54:58 +13:00
finally :
self . rpc_conn . DnssrvUpdateRecord2 ( dnsserver . DNS_CLIENT_VERSION_LONGHORN ,
0 , self . server_ip , self . get_dns_domain ( ) ,
name , None , add_rec_buf )
2016-01-27 17:41:44 +13:00
def test_update_add_two_txt_records ( self ) :
" test adding two txt records works "
prefix , txt = ' textrec2 ' , [ ' " This is a test " ' ,
' " and this is a test, too " ' ]
p = self . make_txt_update ( prefix , txt )
2017-06-09 10:00:09 +12:00
( response , response_packet ) = self . dns_transaction_udp ( p , host = server_ip )
2016-01-27 17:41:44 +13:00
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
self . check_query_txt ( prefix , txt )
2016-01-28 12:54:58 +13:00
self . assertIsNotNone ( dns_record_match ( self . rpc_conn , self . server_ip ,
2016-01-27 17:41:44 +13:00
self . get_dns_domain ( ) ,
" %s . %s " % ( prefix , self . get_dns_domain ( ) ) ,
dnsp . DNS_TYPE_TXT , ' " \\ " This is a test \\ " " ' +
' " \\ " and this is a test, too \\ " " ' ) )
2016-01-28 12:54:58 +13:00
def test_update_add_two_rpc_to_dns ( self ) :
prefix , txt = ' textrec2 ' , [ ' " This is a test " ' ,
' " and this is a test, too " ' ]
prefix = ' rpc ' + prefix
name = " %s . %s " % ( prefix , self . get_dns_domain ( ) )
rec = data_to_dns_record ( dnsp . DNS_TYPE_TXT ,
' " \\ " This is a test \\ " " ' +
' " \\ " and this is a test, too \\ " " ' )
add_rec_buf = dnsserver . DNS_RPC_RECORD_BUF ( )
add_rec_buf . rec = rec
try :
self . rpc_conn . DnssrvUpdateRecord2 ( dnsserver . DNS_CLIENT_VERSION_LONGHORN ,
0 , self . server_ip , self . get_dns_domain ( ) ,
name , add_rec_buf , None )
2017-06-08 16:20:42 +12:00
except WERRORError as e :
self . fail ( str ( e ) )
try :
2016-01-28 12:54:58 +13:00
self . check_query_txt ( prefix , txt )
finally :
self . rpc_conn . DnssrvUpdateRecord2 ( dnsserver . DNS_CLIENT_VERSION_LONGHORN ,
0 , self . server_ip , self . get_dns_domain ( ) ,
name , None , add_rec_buf )
2016-01-27 17:41:44 +13:00
def test_update_add_empty_txt_records ( self ) :
" test adding two txt records works "
prefix , txt = ' emptytextrec ' , [ ]
p = self . make_txt_update ( prefix , txt )
2017-06-09 10:00:09 +12:00
( response , response_packet ) = self . dns_transaction_udp ( p , host = server_ip )
2016-01-27 17:41:44 +13:00
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
self . check_query_txt ( prefix , txt )
2016-01-28 12:54:58 +13:00
self . assertIsNotNone ( dns_record_match ( self . rpc_conn , self . server_ip ,
2016-01-27 17:41:44 +13:00
self . get_dns_domain ( ) ,
" %s . %s " % ( prefix , self . get_dns_domain ( ) ) ,
dnsp . DNS_TYPE_TXT , ' ' ) )
2014-05-13 08:13:29 +02:00
2016-01-28 12:54:58 +13:00
def test_update_add_empty_rpc_to_dns ( self ) :
prefix , txt = ' rpcemptytextrec ' , [ ]
name = " %s . %s " % ( prefix , self . get_dns_domain ( ) )
rec = data_to_dns_record ( dnsp . DNS_TYPE_TXT , ' ' )
add_rec_buf = dnsserver . DNS_RPC_RECORD_BUF ( )
add_rec_buf . rec = rec
try :
self . rpc_conn . DnssrvUpdateRecord2 ( dnsserver . DNS_CLIENT_VERSION_LONGHORN ,
0 , self . server_ip , self . get_dns_domain ( ) ,
name , add_rec_buf , None )
2017-06-08 16:20:42 +12:00
except WERRORError as e :
self . fail ( str ( e ) )
2016-01-28 12:54:58 +13:00
2017-06-08 16:20:42 +12:00
try :
2016-01-28 12:54:58 +13:00
self . check_query_txt ( prefix , txt )
finally :
self . rpc_conn . DnssrvUpdateRecord2 ( dnsserver . DNS_CLIENT_VERSION_LONGHORN ,
0 , self . server_ip , self . get_dns_domain ( ) ,
name , None , add_rec_buf )
2016-01-29 17:03:56 +13:00
TestProgram ( module = __name__ , opts = subunitopts )