mirror of
https://github.com/samba-team/samba.git
synced 2025-01-12 09:18:10 +03:00
CVE-2016-0771: tests/dns: Remove dependencies on env variables
Now that it is invoked as a normal script, there should be less of them. BUG: https://bugzilla.samba.org/show_bug.cgi?id=11128 BUG: https://bugzilla.samba.org/show_bug.cgi?id=11686 Signed-off-by: Garming Sam <garming@catalyst.net.nz> Reviewed-by: Stefan Metzmacher <metze@samba.org> Reviewed-by: Andrew Bartlett <abartlet@samba.org>
This commit is contained in:
parent
9f1ba00f1f
commit
c37c4b18e0
@ -132,9 +132,9 @@ class DNSTest(TestCase):
|
|||||||
|
|
||||||
def get_dns_domain(self):
|
def get_dns_domain(self):
|
||||||
"Helper to get dns domain"
|
"Helper to get dns domain"
|
||||||
return os.getenv('REALM', 'example.com').lower()
|
return self.creds.get_realm().lower()
|
||||||
|
|
||||||
def dns_transaction_udp(self, packet, host=os.getenv('SERVER_IP'),
|
def dns_transaction_udp(self, packet, host=server_ip,
|
||||||
dump=False, timeout=timeout):
|
dump=False, timeout=timeout):
|
||||||
"send a DNS query and read the reply"
|
"send a DNS query and read the reply"
|
||||||
s = None
|
s = None
|
||||||
@ -154,7 +154,7 @@ class DNSTest(TestCase):
|
|||||||
if s is not None:
|
if s is not None:
|
||||||
s.close()
|
s.close()
|
||||||
|
|
||||||
def dns_transaction_tcp(self, packet, host=os.getenv('SERVER_IP'),
|
def dns_transaction_tcp(self, packet, host=server_ip,
|
||||||
dump=False, timeout=timeout):
|
dump=False, timeout=timeout):
|
||||||
"send a DNS query and read the reply"
|
"send a DNS query and read the reply"
|
||||||
s = None
|
s = None
|
||||||
@ -221,7 +221,7 @@ class TestSimpleQueries(DNSTest):
|
|||||||
p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
|
p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
|
||||||
questions = []
|
questions = []
|
||||||
|
|
||||||
name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
|
name = "%s.%s" % (self.server, self.get_dns_domain())
|
||||||
q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
|
q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
|
||||||
print "asking for ", q.name
|
print "asking for ", q.name
|
||||||
questions.append(q)
|
questions.append(q)
|
||||||
@ -232,14 +232,14 @@ class TestSimpleQueries(DNSTest):
|
|||||||
self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
|
self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
|
||||||
self.assertEquals(response.ancount, 1)
|
self.assertEquals(response.ancount, 1)
|
||||||
self.assertEquals(response.answers[0].rdata,
|
self.assertEquals(response.answers[0].rdata,
|
||||||
os.getenv('SERVER_IP'))
|
self.server_ip)
|
||||||
|
|
||||||
def test_one_a_query_tcp(self):
|
def test_one_a_query_tcp(self):
|
||||||
"create a query packet containing one query record via TCP"
|
"create a query packet containing one query record via TCP"
|
||||||
p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
|
p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
|
||||||
questions = []
|
questions = []
|
||||||
|
|
||||||
name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
|
name = "%s.%s" % (self.server, self.get_dns_domain())
|
||||||
q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
|
q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
|
||||||
print "asking for ", q.name
|
print "asking for ", q.name
|
||||||
questions.append(q)
|
questions.append(q)
|
||||||
@ -250,14 +250,14 @@ class TestSimpleQueries(DNSTest):
|
|||||||
self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
|
self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
|
||||||
self.assertEquals(response.ancount, 1)
|
self.assertEquals(response.ancount, 1)
|
||||||
self.assertEquals(response.answers[0].rdata,
|
self.assertEquals(response.answers[0].rdata,
|
||||||
os.getenv('SERVER_IP'))
|
self.server_ip)
|
||||||
|
|
||||||
def test_one_mx_query(self):
|
def test_one_mx_query(self):
|
||||||
"create a query packet causing an empty RCODE_OK answer"
|
"create a query packet causing an empty RCODE_OK answer"
|
||||||
p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
|
p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
|
||||||
questions = []
|
questions = []
|
||||||
|
|
||||||
name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
|
name = "%s.%s" % (self.server, self.get_dns_domain())
|
||||||
q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
|
q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
|
||||||
print "asking for ", q.name
|
print "asking for ", q.name
|
||||||
questions.append(q)
|
questions.append(q)
|
||||||
@ -271,7 +271,7 @@ class TestSimpleQueries(DNSTest):
|
|||||||
p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
|
p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
|
||||||
questions = []
|
questions = []
|
||||||
|
|
||||||
name = "invalid-%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
|
name = "invalid-%s.%s" % (self.server, self.get_dns_domain())
|
||||||
q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
|
q = self.make_name_question(name, dns.DNS_QTYPE_MX, dns.DNS_QCLASS_IN)
|
||||||
print "asking for ", q.name
|
print "asking for ", q.name
|
||||||
questions.append(q)
|
questions.append(q)
|
||||||
@ -287,7 +287,7 @@ class TestSimpleQueries(DNSTest):
|
|||||||
p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
|
p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
|
||||||
questions = []
|
questions = []
|
||||||
|
|
||||||
name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
|
name = "%s.%s" % (self.server, self.get_dns_domain())
|
||||||
q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
|
q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
|
||||||
questions.append(q)
|
questions.append(q)
|
||||||
|
|
||||||
@ -311,7 +311,7 @@ class TestSimpleQueries(DNSTest):
|
|||||||
p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
|
p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
|
||||||
questions = []
|
questions = []
|
||||||
|
|
||||||
name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
|
name = "%s.%s" % (self.server, self.get_dns_domain())
|
||||||
q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN)
|
q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_IN)
|
||||||
print "asking for ", q.name
|
print "asking for ", q.name
|
||||||
questions.append(q)
|
questions.append(q)
|
||||||
@ -328,7 +328,7 @@ class TestSimpleQueries(DNSTest):
|
|||||||
self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
|
self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
|
||||||
self.assertEquals(response.ancount, num_answers)
|
self.assertEquals(response.ancount, num_answers)
|
||||||
self.assertEquals(response.answers[0].rdata,
|
self.assertEquals(response.answers[0].rdata,
|
||||||
os.getenv('SERVER_IP'))
|
self.server_ip)
|
||||||
if dc_ipv6 is not None:
|
if dc_ipv6 is not None:
|
||||||
self.assertEquals(response.answers[1].rdata, dc_ipv6)
|
self.assertEquals(response.answers[1].rdata, dc_ipv6)
|
||||||
|
|
||||||
@ -337,7 +337,7 @@ class TestSimpleQueries(DNSTest):
|
|||||||
p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
|
p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
|
||||||
questions = []
|
questions = []
|
||||||
|
|
||||||
name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
|
name = "%s.%s" % (self.server, self.get_dns_domain())
|
||||||
q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_NONE)
|
q = self.make_name_question(name, dns.DNS_QTYPE_ALL, dns.DNS_QCLASS_NONE)
|
||||||
questions.append(q)
|
questions.append(q)
|
||||||
|
|
||||||
@ -357,7 +357,7 @@ class TestSimpleQueries(DNSTest):
|
|||||||
p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
|
p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
|
||||||
questions = []
|
questions = []
|
||||||
|
|
||||||
name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
|
name = "%s.%s" % (self.server, self.get_dns_domain())
|
||||||
q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
|
q = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
|
||||||
questions.append(q)
|
questions.append(q)
|
||||||
|
|
||||||
@ -394,7 +394,7 @@ class TestDNSUpdates(DNSTest):
|
|||||||
p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
|
p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
|
||||||
updates = []
|
updates = []
|
||||||
|
|
||||||
name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
|
name = "%s.%s" % (self.server, self.get_dns_domain())
|
||||||
u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
|
u = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
|
||||||
updates.append(u)
|
updates.append(u)
|
||||||
|
|
||||||
@ -439,7 +439,7 @@ class TestDNSUpdates(DNSTest):
|
|||||||
|
|
||||||
prereqs = []
|
prereqs = []
|
||||||
r = dns.res_rec()
|
r = dns.res_rec()
|
||||||
r.name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
|
r.name = "%s.%s" % (self.server, self.get_dns_domain())
|
||||||
r.rr_type = dns.DNS_QTYPE_TXT
|
r.rr_type = dns.DNS_QTYPE_TXT
|
||||||
r.rr_class = dns.DNS_QCLASS_NONE
|
r.rr_class = dns.DNS_QCLASS_NONE
|
||||||
r.ttl = 1
|
r.ttl = 1
|
||||||
@ -472,7 +472,7 @@ class TestDNSUpdates(DNSTest):
|
|||||||
|
|
||||||
prereqs = []
|
prereqs = []
|
||||||
r = dns.res_rec()
|
r = dns.res_rec()
|
||||||
r.name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
|
r.name = "%s.%s" % (self.server, self.get_dns_domain())
|
||||||
r.rr_type = dns.DNS_QTYPE_TXT
|
r.rr_type = dns.DNS_QTYPE_TXT
|
||||||
r.rr_class = dns.DNS_QCLASS_ANY
|
r.rr_class = dns.DNS_QCLASS_ANY
|
||||||
r.ttl = 0
|
r.ttl = 0
|
||||||
@ -777,7 +777,7 @@ class TestComplexQueries(DNSTest):
|
|||||||
r.rr_class = dns.DNS_QCLASS_IN
|
r.rr_class = dns.DNS_QCLASS_IN
|
||||||
r.ttl = 900
|
r.ttl = 900
|
||||||
r.length = 0xffff
|
r.length = 0xffff
|
||||||
r.rdata = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
|
r.rdata = "%s.%s" % (self.server, self.get_dns_domain())
|
||||||
updates.append(r)
|
updates.append(r)
|
||||||
p.nscount = len(updates)
|
p.nscount = len(updates)
|
||||||
p.nsrecs = updates
|
p.nsrecs = updates
|
||||||
@ -803,7 +803,7 @@ class TestComplexQueries(DNSTest):
|
|||||||
r.rr_class = dns.DNS_QCLASS_NONE
|
r.rr_class = dns.DNS_QCLASS_NONE
|
||||||
r.ttl = 0
|
r.ttl = 0
|
||||||
r.length = 0xffff
|
r.length = 0xffff
|
||||||
r.rdata = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
|
r.rdata = "%s.%s" % (self.server, self.get_dns_domain())
|
||||||
updates.append(r)
|
updates.append(r)
|
||||||
p.nscount = len(updates)
|
p.nscount = len(updates)
|
||||||
p.nsrecs = updates
|
p.nsrecs = updates
|
||||||
@ -828,10 +828,10 @@ class TestComplexQueries(DNSTest):
|
|||||||
self.assertEquals(response.ancount, 2)
|
self.assertEquals(response.ancount, 2)
|
||||||
self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
|
self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_CNAME)
|
||||||
self.assertEquals(response.answers[0].rdata, "%s.%s" %
|
self.assertEquals(response.answers[0].rdata, "%s.%s" %
|
||||||
(os.getenv('SERVER'), self.get_dns_domain()))
|
(self.server, self.get_dns_domain()))
|
||||||
self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_A)
|
self.assertEquals(response.answers[1].rr_type, dns.DNS_QTYPE_A)
|
||||||
self.assertEquals(response.answers[1].rdata,
|
self.assertEquals(response.answers[1].rdata,
|
||||||
os.getenv('SERVER_IP'))
|
self.server_ip)
|
||||||
|
|
||||||
class TestInvalidQueries(DNSTest):
|
class TestInvalidQueries(DNSTest):
|
||||||
|
|
||||||
@ -841,7 +841,7 @@ class TestInvalidQueries(DNSTest):
|
|||||||
s = None
|
s = None
|
||||||
try:
|
try:
|
||||||
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
|
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
|
||||||
s.connect((os.getenv('SERVER_IP'), 53))
|
s.connect((self.server_ip, 53))
|
||||||
s.send("", 0)
|
s.send("", 0)
|
||||||
finally:
|
finally:
|
||||||
if s is not None:
|
if s is not None:
|
||||||
@ -850,7 +850,7 @@ class TestInvalidQueries(DNSTest):
|
|||||||
p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
|
p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
|
||||||
questions = []
|
questions = []
|
||||||
|
|
||||||
name = "%s.%s" % (os.getenv('SERVER'), self.get_dns_domain())
|
name = "%s.%s" % (self.server, self.get_dns_domain())
|
||||||
q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
|
q = self.make_name_question(name, dns.DNS_QTYPE_A, dns.DNS_QCLASS_IN)
|
||||||
print "asking for ", q.name
|
print "asking for ", q.name
|
||||||
questions.append(q)
|
questions.append(q)
|
||||||
@ -861,7 +861,7 @@ class TestInvalidQueries(DNSTest):
|
|||||||
self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
|
self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
|
||||||
self.assertEquals(response.ancount, 1)
|
self.assertEquals(response.ancount, 1)
|
||||||
self.assertEquals(response.answers[0].rdata,
|
self.assertEquals(response.answers[0].rdata,
|
||||||
os.getenv('SERVER_IP'))
|
self.server_ip)
|
||||||
|
|
||||||
def test_one_a_reply(self):
|
def test_one_a_reply(self):
|
||||||
"send a reply instead of a query"
|
"send a reply instead of a query"
|
||||||
@ -882,7 +882,7 @@ class TestInvalidQueries(DNSTest):
|
|||||||
send_packet = ndr.ndr_pack(p)
|
send_packet = ndr.ndr_pack(p)
|
||||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
|
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
|
||||||
s.settimeout(timeout)
|
s.settimeout(timeout)
|
||||||
host=os.getenv('SERVER_IP')
|
host=self.server_ip
|
||||||
s.connect((host, 53))
|
s.connect((host, 53))
|
||||||
tcp_packet = struct.pack('!H', len(send_packet))
|
tcp_packet = struct.pack('!H', len(send_packet))
|
||||||
tcp_packet += send_packet
|
tcp_packet += send_packet
|
||||||
@ -900,18 +900,8 @@ class TestInvalidQueries(DNSTest):
|
|||||||
s.close()
|
s.close()
|
||||||
|
|
||||||
class TestZones(DNSTest):
|
class TestZones(DNSTest):
|
||||||
def get_credentials(self, lp):
|
|
||||||
creds = credentials.Credentials()
|
|
||||||
creds.guess(lp)
|
|
||||||
creds.set_machine_account(lp)
|
|
||||||
creds.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE)
|
|
||||||
return creds
|
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(TestZones, self).setUp()
|
super(TestZones, self).setUp()
|
||||||
self.lp = self.get_loadparm()
|
|
||||||
self.creds = self.get_credentials(self.lp)
|
|
||||||
self.server = os.getenv("SERVER_IP")
|
|
||||||
self.zone = "test.lan"
|
self.zone = "test.lan"
|
||||||
self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server_ip),
|
self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server_ip),
|
||||||
self.lp, self.creds)
|
self.lp, self.creds)
|
||||||
@ -979,18 +969,8 @@ class TestZones(DNSTest):
|
|||||||
self.assertEquals(response.ancount, 0)
|
self.assertEquals(response.ancount, 0)
|
||||||
|
|
||||||
class TestRPCRoundtrip(DNSTest):
|
class TestRPCRoundtrip(DNSTest):
|
||||||
def get_credentials(self, lp):
|
|
||||||
creds = credentials.Credentials()
|
|
||||||
creds.guess(lp)
|
|
||||||
creds.set_machine_account(lp)
|
|
||||||
creds.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE)
|
|
||||||
return creds
|
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(TestRPCRoundtrip, self).setUp()
|
super(TestRPCRoundtrip, self).setUp()
|
||||||
self.lp = self.get_loadparm()
|
|
||||||
self.creds = self.get_credentials(self.lp)
|
|
||||||
self.server = os.getenv("SERVER_IP")
|
|
||||||
self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server_ip),
|
self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server_ip),
|
||||||
self.lp, self.creds)
|
self.lp, self.creds)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user