2011-11-11 03:32:09 +04:00
# Unix SMB/CIFS implementation.
# Copyright (C) Kai Blin <kai@samba.org> 2011
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import os
import struct
import random
from samba import socket
import samba . ndr as ndr
import samba . dcerpc . dns as dns
from samba . tests import TestCase
class DNSTest ( TestCase ) :
2011-12-16 14:36:47 +04:00
def errstr ( self , errcode ) :
" Return a readable error code "
string_codes = [
" OK " ,
" FORMERR " ,
" SERVFAIL " ,
" NXDOMAIN " ,
" NOTIMP " ,
" REFUSED " ,
" YXDOMAIN " ,
" YXRRSET " ,
" NXRRSET " ,
" NOTAUTH " ,
" NOTZONE " ,
]
return string_codes [ errcode ]
2011-11-11 03:32:09 +04:00
def assert_dns_rcode_equals ( self , packet , rcode ) :
" Helper function to check return code "
p_errcode = packet . operation & 0x000F
2012-09-27 20:30:47 +04:00
self . assertEquals ( p_errcode , rcode , " Expected RCODE %s , got %s " %
2011-12-16 14:36:47 +04:00
( self . errstr ( rcode ) , self . errstr ( p_errcode ) ) )
2011-11-11 03:32:09 +04:00
def assert_dns_opcode_equals ( self , packet , opcode ) :
" Helper function to check opcode "
p_opcode = packet . operation & 0x7800
2012-09-27 20:30:47 +04:00
self . assertEquals ( p_opcode , opcode , " Expected OPCODE %s , got %s " %
2011-11-11 03:32:09 +04:00
( opcode , p_opcode ) )
def make_name_packet ( self , opcode , qid = None ) :
" Helper creating a dns.name_packet "
p = dns . name_packet ( )
if qid is None :
p . id = random . randint ( 0x0 , 0xffff )
p . operation = opcode
p . questions = [ ]
return p
def finish_name_packet ( self , packet , questions ) :
" Helper to finalize a dns.name_packet "
packet . qdcount = len ( questions )
packet . questions = questions
def make_name_question ( self , name , qtype , qclass ) :
" Helper creating a dns.name_question "
q = dns . name_question ( )
q . name = name
q . question_type = qtype
q . question_class = qclass
return q
def get_dns_domain ( self ) :
" Helper to get dns domain "
return os . getenv ( ' REALM ' , ' example.com ' ) . lower ( )
2012-09-11 02:14:39 +04:00
def dns_transaction_udp ( self , packet , host = os . getenv ( ' SERVER_IP ' ) ) :
2011-11-24 15:09:58 +04:00
" send a DNS query and read the reply "
s = None
try :
send_packet = ndr . ndr_pack ( packet )
s = socket . socket ( socket . AF_INET , socket . SOCK_DGRAM , 0 )
s . connect ( ( host , 53 ) )
s . send ( send_packet , 0 )
recv_packet = s . recv ( 2048 , 0 )
return ndr . ndr_unpack ( dns . name_packet , recv_packet )
finally :
if s is not None :
s . close ( )
2012-09-11 02:14:39 +04:00
def dns_transaction_tcp ( self , packet , host = os . getenv ( ' SERVER_IP ' ) ) :
2012-03-06 11:49:16 +04:00
" send a DNS query and read the reply "
s = None
try :
send_packet = ndr . ndr_pack ( packet )
s = socket . socket ( socket . AF_INET , socket . SOCK_STREAM , 0 )
s . connect ( ( host , 53 ) )
tcp_packet = struct . pack ( ' !H ' , len ( send_packet ) )
tcp_packet + = send_packet
s . send ( tcp_packet , 0 )
recv_packet = s . recv ( 0xffff + 2 , 0 )
return ndr . ndr_unpack ( dns . name_packet , recv_packet [ 2 : ] )
finally :
if s is not None :
s . close ( )
2012-09-16 16:18:51 +04:00
2012-05-30 10:08:53 +04:00
class TestSimpleQueries ( DNSTest ) :
2012-09-16 16:18:51 +04:00
2011-11-11 03:32:09 +04:00
def test_one_a_query ( self ) :
" create a query packet containing one query record "
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
2012-09-11 02:14:39 +04:00
name = " %s . %s " % ( os . getenv ( ' SERVER ' ) , self . get_dns_domain ( ) )
2011-11-11 03:32:09 +04:00
q = self . make_name_question ( name , dns . DNS_QTYPE_A , dns . DNS_QCLASS_IN )
print " asking for " , q . name
questions . append ( q )
self . finish_name_packet ( p , questions )
response = self . dns_transaction_udp ( p )
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
self . assert_dns_opcode_equals ( response , dns . DNS_OPCODE_QUERY )
2011-11-24 15:10:40 +04:00
self . assertEquals ( response . ancount , 1 )
self . assertEquals ( response . answers [ 0 ] . rdata ,
2012-09-11 02:14:39 +04:00
os . getenv ( ' SERVER_IP ' ) )
2011-11-11 03:32:09 +04:00
2012-03-06 11:49:16 +04:00
def test_one_a_query_tcp ( self ) :
" create a query packet containing one query record via TCP "
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
2012-09-11 02:14:39 +04:00
name = " %s . %s " % ( os . getenv ( ' SERVER ' ) , self . get_dns_domain ( ) )
2012-03-06 11:49:16 +04:00
q = self . make_name_question ( name , dns . DNS_QTYPE_A , dns . DNS_QCLASS_IN )
print " asking for " , q . name
questions . append ( q )
self . finish_name_packet ( p , questions )
response = self . dns_transaction_tcp ( p )
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
self . assert_dns_opcode_equals ( response , dns . DNS_OPCODE_QUERY )
self . assertEquals ( response . ancount , 1 )
self . assertEquals ( response . answers [ 0 ] . rdata ,
2012-09-11 02:14:39 +04:00
os . getenv ( ' SERVER_IP ' ) )
2012-03-06 11:49:16 +04:00
2011-11-11 03:32:09 +04:00
def test_two_queries ( self ) :
" create a query packet containing two query records "
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
2012-09-11 02:14:39 +04:00
name = " %s . %s " % ( os . getenv ( ' SERVER ' ) , self . get_dns_domain ( ) )
2011-11-11 03:32:09 +04:00
q = self . make_name_question ( name , dns . DNS_QTYPE_A , dns . DNS_QCLASS_IN )
questions . append ( q )
name = " %s . %s " % ( ' bogusname ' , self . get_dns_domain ( ) )
q = self . make_name_question ( name , dns . DNS_QTYPE_A , dns . DNS_QCLASS_IN )
questions . append ( q )
self . finish_name_packet ( p , questions )
response = self . dns_transaction_udp ( p )
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_FORMERR )
2011-11-24 15:11:26 +04:00
def test_qtype_all_query ( self ) :
" create a QTYPE_ALL query "
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
2012-09-11 02:14:39 +04:00
name = " %s . %s " % ( os . getenv ( ' SERVER ' ) , self . get_dns_domain ( ) )
2011-11-24 15:11:26 +04:00
q = self . make_name_question ( name , dns . DNS_QTYPE_ALL , dns . DNS_QCLASS_IN )
print " asking for " , q . name
questions . append ( q )
self . finish_name_packet ( p , questions )
response = self . dns_transaction_udp ( p )
num_answers = 1
2012-09-11 02:14:39 +04:00
dc_ipv6 = os . getenv ( ' SERVER_IPV6 ' )
2011-11-24 15:11:26 +04:00
if dc_ipv6 is not None :
num_answers + = 1
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
self . assert_dns_opcode_equals ( response , dns . DNS_OPCODE_QUERY )
self . assertEquals ( response . ancount , num_answers )
self . assertEquals ( response . answers [ 0 ] . rdata ,
2012-09-11 02:14:39 +04:00
os . getenv ( ' SERVER_IP ' ) )
2011-11-24 15:11:26 +04:00
if dc_ipv6 is not None :
self . assertEquals ( response . answers [ 1 ] . rdata , dc_ipv6 )
2011-11-11 03:32:09 +04:00
2011-11-24 15:14:55 +04:00
def test_qclass_none_query ( self ) :
" create a QCLASS_NONE query "
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
2012-09-11 02:14:39 +04:00
name = " %s . %s " % ( os . getenv ( ' SERVER ' ) , self . get_dns_domain ( ) )
2011-11-24 15:14:55 +04:00
q = self . make_name_question ( name , dns . DNS_QTYPE_ALL , dns . DNS_QCLASS_NONE )
questions . append ( q )
self . finish_name_packet ( p , questions )
response = self . dns_transaction_udp ( p )
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_NOTIMP )
2011-12-03 22:08:09 +04:00
# Only returns an authority section entry in BIND and Win DNS
# FIXME: Enable one Samba implements this feature
# def test_soa_hostname_query(self):
# "create a SOA query for a hostname"
# p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
# questions = []
#
2012-09-11 02:14:39 +04:00
# name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
2011-12-03 22:08:09 +04:00
# q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
# questions.append(q)
#
# self.finish_name_packet(p, questions)
# response = self.dns_transaction_udp(p)
# self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
# self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
# # We don't get SOA records for single hosts
# self.assertEquals(response.ancount, 0)
def test_soa_domain_query ( self ) :
" create a SOA query for a domain "
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
name = self . get_dns_domain ( )
q = self . make_name_question ( name , dns . DNS_QTYPE_SOA , dns . DNS_QCLASS_IN )
questions . append ( q )
self . finish_name_packet ( p , questions )
response = self . dns_transaction_udp ( p )
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
self . assert_dns_opcode_equals ( response , dns . DNS_OPCODE_QUERY )
self . assertEquals ( response . ancount , 1 )
2011-11-24 15:14:55 +04:00
2012-05-30 10:08:53 +04:00
class TestDNSUpdates ( DNSTest ) :
2012-09-16 16:18:51 +04:00
2011-12-09 04:14:35 +04:00
def test_two_updates ( self ) :
" create two update requests "
p = self . make_name_packet ( dns . DNS_OPCODE_UPDATE )
updates = [ ]
2012-09-11 02:14:39 +04:00
name = " %s . %s " % ( os . getenv ( ' SERVER ' ) , self . get_dns_domain ( ) )
2011-12-09 04:14:35 +04:00
u = self . make_name_question ( name , dns . DNS_QTYPE_A , dns . DNS_QCLASS_IN )
updates . append ( u )
name = self . get_dns_domain ( )
u = self . make_name_question ( name , dns . DNS_QTYPE_A , dns . DNS_QCLASS_IN )
updates . append ( u )
self . finish_name_packet ( p , updates )
response = self . dns_transaction_udp ( p )
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_FORMERR )
2011-12-09 04:26:39 +04:00
def test_update_wrong_qclass ( self ) :
" create update with DNS_QCLASS_NONE "
p = self . make_name_packet ( dns . DNS_OPCODE_UPDATE )
updates = [ ]
name = self . get_dns_domain ( )
u = self . make_name_question ( name , dns . DNS_QTYPE_A , dns . DNS_QCLASS_NONE )
updates . append ( u )
self . finish_name_packet ( p , updates )
response = self . dns_transaction_udp ( p )
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_NOTIMP )
2011-12-09 04:14:35 +04:00
2011-12-16 16:45:22 +04:00
def test_update_prereq_with_non_null_ttl ( self ) :
" test update with a non-null TTL "
p = self . make_name_packet ( dns . DNS_OPCODE_UPDATE )
updates = [ ]
name = self . get_dns_domain ( )
u = self . make_name_question ( name , dns . DNS_QTYPE_SOA , dns . DNS_QCLASS_IN )
updates . append ( u )
self . finish_name_packet ( p , updates )
prereqs = [ ]
r = dns . res_rec ( )
2012-09-11 02:14:39 +04:00
r . name = " %s . %s " % ( os . getenv ( ' SERVER ' ) , self . get_dns_domain ( ) )
2011-12-16 16:45:22 +04:00
r . rr_type = dns . DNS_QTYPE_TXT
r . rr_class = dns . DNS_QCLASS_NONE
r . ttl = 1
r . length = 0
prereqs . append ( r )
p . ancount = len ( prereqs )
p . answers = prereqs
response = self . dns_transaction_udp ( p )
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_FORMERR )
# I'd love to test this one, but it segfaults. :)
# def test_update_prereq_with_non_null_length(self):
# "test update with a non-null length"
# p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
# updates = []
#
# name = self.get_dns_domain()
#
# u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
# updates.append(u)
# self.finish_name_packet(p, updates)
#
# prereqs = []
# r = dns.res_rec()
2012-09-11 02:14:39 +04:00
# r.name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
2011-12-16 16:45:22 +04:00
# r.rr_type = dns.DNS_QTYPE_TXT
# r.rr_class = dns.DNS_QCLASS_ANY
# r.ttl = 0
# r.length = 1
# prereqs.append(r)
#
# p.ancount = len(prereqs)
# p.answers = prereqs
#
# response = self.dns_transaction_udp(p)
# self.assert_dns_rcode_equals(response, dns.DNS_RCODE_FORMERR)
def test_update_prereq_nonexisting_name ( self ) :
2012-03-11 02:49:18 +04:00
" test update with a nonexisting name "
2011-12-16 16:45:22 +04:00
p = self . make_name_packet ( dns . DNS_OPCODE_UPDATE )
updates = [ ]
name = self . get_dns_domain ( )
u = self . make_name_question ( name , dns . DNS_QTYPE_SOA , dns . DNS_QCLASS_IN )
updates . append ( u )
self . finish_name_packet ( p , updates )
prereqs = [ ]
r = dns . res_rec ( )
r . name = " idontexist. %s " % self . get_dns_domain ( )
r . rr_type = dns . DNS_QTYPE_TXT
r . rr_class = dns . DNS_QCLASS_ANY
r . ttl = 0
r . length = 0
prereqs . append ( r )
p . ancount = len ( prereqs )
p . answers = prereqs
response = self . dns_transaction_udp ( p )
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_NXRRSET )
2012-03-11 02:48:44 +04:00
def test_update_add_txt_record ( self ) :
" test adding records works "
p = self . make_name_packet ( dns . DNS_OPCODE_UPDATE )
updates = [ ]
name = self . get_dns_domain ( )
u = self . make_name_question ( name , dns . DNS_QTYPE_SOA , dns . DNS_QCLASS_IN )
updates . append ( u )
self . finish_name_packet ( p , updates )
updates = [ ]
r = dns . res_rec ( )
r . name = " textrec. %s " % self . get_dns_domain ( )
r . rr_type = dns . DNS_QTYPE_TXT
r . rr_class = dns . DNS_QCLASS_IN
r . ttl = 900
r . length = 0xffff
r . rdata = dns . txt_record ( )
r . rdata . txt = ' " This is a test " '
updates . append ( r )
p . nscount = len ( updates )
p . nsrecs = updates
response = self . dns_transaction_udp ( p )
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
name = " textrec. %s " % self . get_dns_domain ( )
q = self . make_name_question ( name , dns . DNS_QTYPE_TXT , dns . DNS_QCLASS_IN )
questions . append ( q )
self . finish_name_packet ( p , questions )
response = self . dns_transaction_udp ( p )
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
self . assertEquals ( response . ancount , 1 )
self . assertEquals ( response . answers [ 0 ] . rdata . txt , ' " This is a test " ' )
def test_update_add_two_txt_records ( self ) :
" test adding two txt records works "
p = self . make_name_packet ( dns . DNS_OPCODE_UPDATE )
updates = [ ]
name = self . get_dns_domain ( )
u = self . make_name_question ( name , dns . DNS_QTYPE_SOA , dns . DNS_QCLASS_IN )
updates . append ( u )
self . finish_name_packet ( p , updates )
updates = [ ]
r = dns . res_rec ( )
r . name = " textrec2. %s " % self . get_dns_domain ( )
r . rr_type = dns . DNS_QTYPE_TXT
r . rr_class = dns . DNS_QCLASS_IN
r . ttl = 900
r . length = 0xffff
r . rdata = dns . txt_record ( )
r . rdata . txt = ' " This is a test " " and this is a test, too " '
updates . append ( r )
p . nscount = len ( updates )
p . nsrecs = updates
response = self . dns_transaction_udp ( p )
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
name = " textrec2. %s " % self . get_dns_domain ( )
q = self . make_name_question ( name , dns . DNS_QTYPE_TXT , dns . DNS_QCLASS_IN )
questions . append ( q )
self . finish_name_packet ( p , questions )
response = self . dns_transaction_udp ( p )
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
self . assertEquals ( response . ancount , 1 )
self . assertEquals ( response . answers [ 0 ] . rdata . txt , ' " This is a test " " and this is a test, too " ' )
2012-03-11 03:25:57 +04:00
def test_delete_record ( self ) :
" Test if deleting records works "
2012-09-30 13:26:24 +04:00
NAME = " deleterec. %s " % self . get_dns_domain ( )
# First, create a record to make sure we have a record to delete.
2012-03-11 03:25:57 +04:00
p = self . make_name_packet ( dns . DNS_OPCODE_UPDATE )
updates = [ ]
name = self . get_dns_domain ( )
u = self . make_name_question ( name , dns . DNS_QTYPE_SOA , dns . DNS_QCLASS_IN )
updates . append ( u )
self . finish_name_packet ( p , updates )
updates = [ ]
r = dns . res_rec ( )
2012-09-30 13:26:24 +04:00
r . name = NAME
r . rr_type = dns . DNS_QTYPE_TXT
r . rr_class = dns . DNS_QCLASS_IN
r . ttl = 900
r . length = 0xffff
r . rdata = dns . txt_record ( )
r . rdata . txt = ' " This is a test " '
updates . append ( r )
p . nscount = len ( updates )
p . nsrecs = updates
response = self . dns_transaction_udp ( p )
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
# Now check the record is around
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
q = self . make_name_question ( NAME , dns . DNS_QTYPE_TXT , dns . DNS_QCLASS_IN )
questions . append ( q )
self . finish_name_packet ( p , questions )
response = self . dns_transaction_udp ( p )
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
# Now delete the record
p = self . make_name_packet ( dns . DNS_OPCODE_UPDATE )
updates = [ ]
name = self . get_dns_domain ( )
u = self . make_name_question ( name , dns . DNS_QTYPE_SOA , dns . DNS_QCLASS_IN )
updates . append ( u )
self . finish_name_packet ( p , updates )
updates = [ ]
r = dns . res_rec ( )
r . name = NAME
2012-03-11 03:25:57 +04:00
r . rr_type = dns . DNS_QTYPE_TXT
r . rr_class = dns . DNS_QCLASS_NONE
r . ttl = 0
r . length = 0xffff
r . rdata = dns . txt_record ( )
r . rdata . txt = ' " This is a test " '
updates . append ( r )
p . nscount = len ( updates )
p . nsrecs = updates
response = self . dns_transaction_udp ( p )
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
2012-09-30 13:26:24 +04:00
# And finally check it's gone
2012-03-11 03:25:57 +04:00
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
2012-09-30 13:26:24 +04:00
q = self . make_name_question ( NAME , dns . DNS_QTYPE_TXT , dns . DNS_QCLASS_IN )
2012-03-11 03:25:57 +04:00
questions . append ( q )
self . finish_name_packet ( p , questions )
response = self . dns_transaction_udp ( p )
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_NXDOMAIN )
2012-06-01 10:05:54 +04:00
class TestComplexQueries ( DNSTest ) :
2012-09-16 16:18:51 +04:00
2012-06-01 10:05:54 +04:00
def setUp ( self ) :
super ( TestComplexQueries , self ) . setUp ( )
p = self . make_name_packet ( dns . DNS_OPCODE_UPDATE )
updates = [ ]
name = self . get_dns_domain ( )
u = self . make_name_question ( name , dns . DNS_QTYPE_SOA , dns . DNS_QCLASS_IN )
updates . append ( u )
self . finish_name_packet ( p , updates )
updates = [ ]
r = dns . res_rec ( )
r . name = " cname_test. %s " % self . get_dns_domain ( )
r . rr_type = dns . DNS_QTYPE_CNAME
r . rr_class = dns . DNS_QCLASS_IN
r . ttl = 900
r . length = 0xffff
2012-09-11 02:14:39 +04:00
r . rdata = " %s . %s " % ( os . getenv ( ' SERVER ' ) , self . get_dns_domain ( ) )
2012-06-01 10:05:54 +04:00
updates . append ( r )
p . nscount = len ( updates )
p . nsrecs = updates
response = self . dns_transaction_udp ( p )
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
def tearDown ( self ) :
super ( TestComplexQueries , self ) . tearDown ( )
p = self . make_name_packet ( dns . DNS_OPCODE_UPDATE )
updates = [ ]
name = self . get_dns_domain ( )
u = self . make_name_question ( name , dns . DNS_QTYPE_SOA , dns . DNS_QCLASS_IN )
updates . append ( u )
self . finish_name_packet ( p , updates )
updates = [ ]
r = dns . res_rec ( )
r . name = " cname_test. %s " % self . get_dns_domain ( )
r . rr_type = dns . DNS_QTYPE_CNAME
r . rr_class = dns . DNS_QCLASS_NONE
r . ttl = 0
r . length = 0xffff
2012-09-11 02:14:39 +04:00
r . rdata = " %s . %s " % ( os . getenv ( ' SERVER ' ) , self . get_dns_domain ( ) )
2012-06-01 10:05:54 +04:00
updates . append ( r )
p . nscount = len ( updates )
p . nsrecs = updates
response = self . dns_transaction_udp ( p )
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
def test_one_a_query ( self ) :
" create a query packet containing one query record "
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
name = " cname_test. %s " % self . get_dns_domain ( )
q = self . make_name_question ( name , dns . DNS_QTYPE_A , dns . DNS_QCLASS_IN )
print " asking for " , q . name
questions . append ( q )
self . finish_name_packet ( p , questions )
response = self . dns_transaction_udp ( p )
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
self . assert_dns_opcode_equals ( response , dns . DNS_OPCODE_QUERY )
self . assertEquals ( response . ancount , 2 )
self . assertEquals ( response . answers [ 0 ] . rr_type , dns . DNS_QTYPE_CNAME )
self . assertEquals ( response . answers [ 0 ] . rdata , " %s . %s " %
2012-09-11 02:14:39 +04:00
( os . getenv ( ' SERVER ' ) , self . get_dns_domain ( ) ) )
2012-06-01 10:05:54 +04:00
self . assertEquals ( response . answers [ 1 ] . rr_type , dns . DNS_QTYPE_A )
self . assertEquals ( response . answers [ 1 ] . rdata ,
2012-09-11 02:14:39 +04:00
os . getenv ( ' SERVER_IP ' ) )
2012-06-01 10:05:54 +04:00
2012-09-22 01:06:13 +04:00
class TestInvalidQueries ( DNSTest ) :
def test_one_a_query ( self ) :
" send 0 bytes follows by create a query packet containing one query record "
s = None
try :
s = socket . socket ( socket . AF_INET , socket . SOCK_DGRAM , 0 )
s . connect ( ( os . getenv ( ' SERVER_IP ' ) , 53 ) )
s . send ( " " , 0 )
finally :
if s is not None :
s . close ( )
p = self . make_name_packet ( dns . DNS_OPCODE_QUERY )
questions = [ ]
name = " %s . %s " % ( os . getenv ( ' SERVER ' ) , self . get_dns_domain ( ) )
q = self . make_name_question ( name , dns . DNS_QTYPE_A , dns . DNS_QCLASS_IN )
print " asking for " , q . name
questions . append ( q )
self . finish_name_packet ( p , questions )
response = self . dns_transaction_udp ( p )
self . assert_dns_rcode_equals ( response , dns . DNS_RCODE_OK )
self . assert_dns_opcode_equals ( response , dns . DNS_OPCODE_QUERY )
self . assertEquals ( response . ancount , 1 )
self . assertEquals ( response . answers [ 0 ] . rdata ,
os . getenv ( ' SERVER_IP ' ) )
2012-06-01 10:05:54 +04:00
2011-11-11 03:32:09 +04:00
if __name__ == " __main__ " :
import unittest
unittest . main ( )