mirror of
https://github.com/samba-team/samba.git
synced 2025-01-18 06:04:06 +03:00
dns: more debug debug options in the tests
Signed-off-by: Kai Blin <kai@samba.org> Reviewed-By: Amitay Isaacs <amitay@gmail.com>
This commit is contained in:
parent
4364a3faf6
commit
223cf7fb30
@ -23,6 +23,9 @@ import samba.ndr as ndr
|
|||||||
import samba.dcerpc.dns as dns
|
import samba.dcerpc.dns as dns
|
||||||
from samba.tests import TestCase
|
from samba.tests import TestCase
|
||||||
|
|
||||||
|
FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.' for x in range(256)])
|
||||||
|
|
||||||
|
|
||||||
class DNSTest(TestCase):
|
class DNSTest(TestCase):
|
||||||
|
|
||||||
def errstr(self, errcode):
|
def errstr(self, errcode):
|
||||||
@ -82,36 +85,53 @@ class DNSTest(TestCase):
|
|||||||
"Helper to get dns domain"
|
"Helper to get dns domain"
|
||||||
return os.getenv('REALM', 'example.com').lower()
|
return os.getenv('REALM', 'example.com').lower()
|
||||||
|
|
||||||
def dns_transaction_udp(self, packet, host=os.getenv('SERVER_IP')):
|
def dns_transaction_udp(self, packet, host=os.getenv('SERVER_IP'), dump=False):
|
||||||
"send a DNS query and read the reply"
|
"send a DNS query and read the reply"
|
||||||
s = None
|
s = None
|
||||||
try:
|
try:
|
||||||
send_packet = ndr.ndr_pack(packet)
|
send_packet = ndr.ndr_pack(packet)
|
||||||
|
if dump:
|
||||||
|
print self.hexdump(send_packet)
|
||||||
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
|
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
|
||||||
s.connect((host, 53))
|
s.connect((host, 53))
|
||||||
s.send(send_packet, 0)
|
s.send(send_packet, 0)
|
||||||
recv_packet = s.recv(2048, 0)
|
recv_packet = s.recv(2048, 0)
|
||||||
|
if dump:
|
||||||
|
print self.hexdump(recv_packet)
|
||||||
return ndr.ndr_unpack(dns.name_packet, recv_packet)
|
return ndr.ndr_unpack(dns.name_packet, recv_packet)
|
||||||
finally:
|
finally:
|
||||||
if s is not None:
|
if s is not None:
|
||||||
s.close()
|
s.close()
|
||||||
|
|
||||||
def dns_transaction_tcp(self, packet, host=os.getenv('SERVER_IP')):
|
def dns_transaction_tcp(self, packet, host=os.getenv('SERVER_IP'), dump=False):
|
||||||
"send a DNS query and read the reply"
|
"send a DNS query and read the reply"
|
||||||
s = None
|
s = None
|
||||||
try:
|
try:
|
||||||
send_packet = ndr.ndr_pack(packet)
|
send_packet = ndr.ndr_pack(packet)
|
||||||
|
if dump:
|
||||||
|
print self.hexdump(send_packet)
|
||||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
|
||||||
s.connect((host, 53))
|
s.connect((host, 53))
|
||||||
tcp_packet = struct.pack('!H', len(send_packet))
|
tcp_packet = struct.pack('!H', len(send_packet))
|
||||||
tcp_packet += send_packet
|
tcp_packet += send_packet
|
||||||
s.send(tcp_packet, 0)
|
s.send(tcp_packet, 0)
|
||||||
recv_packet = s.recv(0xffff + 2, 0)
|
recv_packet = s.recv(0xffff + 2, 0)
|
||||||
|
if dump:
|
||||||
|
print self.hexdump(recv_packet)
|
||||||
return ndr.ndr_unpack(dns.name_packet, recv_packet[2:])
|
return ndr.ndr_unpack(dns.name_packet, recv_packet[2:])
|
||||||
finally:
|
finally:
|
||||||
if s is not None:
|
if s is not None:
|
||||||
s.close()
|
s.close()
|
||||||
|
|
||||||
|
def hexdump(self, src, length=8):
|
||||||
|
N=0; result=''
|
||||||
|
while src:
|
||||||
|
s,src = src[:length],src[length:]
|
||||||
|
hexa = ' '.join(["%02X"%ord(x) for x in s])
|
||||||
|
s = s.translate(FILTER)
|
||||||
|
result += "%04X %-*s %s\n" % (N, length*3, hexa, s)
|
||||||
|
N+=length
|
||||||
|
return result
|
||||||
|
|
||||||
class TestSimpleQueries(DNSTest):
|
class TestSimpleQueries(DNSTest):
|
||||||
|
|
||||||
@ -550,8 +570,10 @@ class TestDNSUpdates(DNSTest):
|
|||||||
response = self.dns_transaction_udp(p)
|
response = self.dns_transaction_udp(p)
|
||||||
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
|
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
|
||||||
self.assertEqual(response.ancount, 1)
|
self.assertEqual(response.ancount, 1)
|
||||||
self.assertEqual(response.answers[0].rdata.preference, 10)
|
ans = response.answers[0]
|
||||||
self.assertEqual(response.answers[0].rdata.exchange, 'mail.%s' % self.get_dns_domain())
|
self.assertEqual(ans.rr_type, dns.DNS_QTYPE_MX)
|
||||||
|
self.assertEqual(ans.rdata.preference, 10)
|
||||||
|
self.assertEqual(ans.rdata.exchange, 'mail.%s' % self.get_dns_domain())
|
||||||
|
|
||||||
|
|
||||||
class TestComplexQueries(DNSTest):
|
class TestComplexQueries(DNSTest):
|
||||||
|
Loading…
x
Reference in New Issue
Block a user