1
0
mirror of https://github.com/samba-team/samba.git synced 2025-01-08 21:18:16 +03:00

dns.py: Test dns server reload zones from DSDB when are created or deleted

Signed-off-by: Samuel Cabrero <samuelcabrero@kernevil.me>

Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Reviewed-by: Garming Sam <garming@catalyst.net.nz>
This commit is contained in:
Samuel Cabrero 2014-12-16 18:04:13 +01:00 committed by Garming Sam
parent 4fb29e9347
commit 336ffb29b5

View File

@ -21,7 +21,9 @@ import random
import socket
import samba.ndr as ndr
import samba.dcerpc.dns as dns
from samba import credentials, param
from samba.tests import TestCase
from samba.dcerpc import dnsp, dnsserver
FILTER=''.join([(len(repr(chr(x)))==3) and chr(x) or '.' for x in range(256)])
@ -864,6 +866,82 @@ class TestInvalidQueries(DNSTest):
if s is not None:
s.close()
class TestZones(DNSTest):
def get_loadparm(self):
lp = param.LoadParm()
lp.load(os.getenv("SMB_CONF_PATH"))
return lp
def get_credentials(self, lp):
creds = credentials.Credentials()
creds.guess(lp)
creds.set_machine_account(lp)
creds.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE)
return creds
def setUp(self):
super(TestZones, self).setUp()
self.lp = self.get_loadparm()
self.creds = self.get_credentials(self.lp)
self.server = os.getenv("SERVER_IP")
self.zone = "test.lan"
self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s" % (self.server),
self.lp, self.creds)
def create_zone(self, zone):
zone_create = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
zone_create.pszZoneName = zone
zone_create.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_SECURE
zone_create.fAging = 0
zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0,
self.server,
None,
0,
'ZoneCreate',
dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
zone_create)
def delete_zone(self, zone):
self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0,
self.server,
zone,
0,
'DeleteZoneFromDs',
dnsserver.DNSSRV_TYPEID_NULL,
None)
def test_soa_query(self):
zone = "test.lan"
p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
questions = []
q = self.make_name_question(zone, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
questions.append(q)
self.finish_name_packet(p, questions)
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
self.assertEquals(response.ancount, 0)
self.create_zone(zone)
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
self.assertEquals(response.ancount, 1)
self.assertEquals(response.answers[0].rr_type, dns.DNS_QTYPE_SOA)
self.delete_zone(zone)
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXDOMAIN)
self.assert_dns_opcode_equals(response, dns.DNS_OPCODE_QUERY)
self.assertEquals(response.ancount, 0)
if __name__ == "__main__":
import unittest