1
0
mirror of https://github.com/samba-team/samba.git synced 2025-08-03 04:22:09 +03:00

s4 dns: Support TXT updates, add tests

This commit is contained in:
Kai Blin
2012-03-10 23:48:44 +01:00
parent e6c6f49595
commit 98ae3592ad
2 changed files with 110 additions and 8 deletions

View File

@ -285,6 +285,10 @@ static WERROR dns_rr_to_dnsp(TALLOC_CTX *mem_ctx,
const struct dns_res_rec *rrec,
struct dnsp_DnssrvRpcRecord *r)
{
char *tmp;
char *txt_record_txt;
char *saveptr = NULL;
if (rrec->rr_type == DNS_QTYPE_ALL) {
return DNS_ERR(FORMAT_ERROR);
}
@ -334,15 +338,30 @@ static WERROR dns_rr_to_dnsp(TALLOC_CTX *mem_ctx,
W_ERROR_HAVE_NO_MEMORY(r->data.mx.nameTarget);
break;
case DNS_QTYPE_TXT:
/* FIXME: This converts the TXT rr data into a single string.
* Since dns server does not reply to qtype TXT,
* this is not yet relevant.
*/
r->data.txt.count = 1;
r->data.txt.str = talloc_array(mem_ctx, const char *, 1);
r->data.txt.count = 0;
r->data.txt.str = talloc_array(mem_ctx, const char *,
r->data.txt.count);
W_ERROR_HAVE_NO_MEMORY(r->data.txt.str);
r->data.txt.str[0] = talloc_strdup(mem_ctx, rrec->rdata.txt_record.txt);
W_ERROR_HAVE_NO_MEMORY(r->data.txt.str[0]);
txt_record_txt = talloc_strdup(r->data.txt.str,
rrec->rdata.txt_record.txt);
W_ERROR_HAVE_NO_MEMORY(txt_record_txt);
tmp = strtok_r(txt_record_txt, "\"", &saveptr);
while (tmp) {
if (strcmp(tmp, " ") == 0) {
tmp = strtok_r(NULL, "\"", &saveptr);
continue;
}
r->data.txt.str = talloc_realloc(mem_ctx, r->data.txt.str, const char *,
r->data.txt.count+1);
r->data.txt.str[r->data.txt.count] = talloc_strdup(r->data.txt.str, tmp);
W_ERROR_HAVE_NO_MEMORY(r->data.txt.str[r->data.txt.count]);
r->data.txt.count++;
tmp = strtok_r(NULL, "\"", &saveptr);
}
break;
default:
DEBUG(0, ("Got a qytpe of %d\n", rrec->rr_type));
@ -390,6 +409,8 @@ static WERROR handle_one_update(struct dns_server *dns,
case DNS_QTYPE_TXT:
break;
default:
DEBUG(0, ("Can't handle updates of type %u yet\n",
update->rr_type));
return DNS_ERR(NOT_IMPLEMENTED);
}

View File

@ -349,6 +349,87 @@ class DNSTest(TestCase):
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_NXRRSET)
def test_update_add_txt_record(self):
"test adding records works"
p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
updates = []
name = self.get_dns_domain()
u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
updates.append(u)
self.finish_name_packet(p, updates)
updates = []
r = dns.res_rec()
r.name = "textrec.%s" % self.get_dns_domain()
r.rr_type = dns.DNS_QTYPE_TXT
r.rr_class = dns.DNS_QCLASS_IN
r.ttl = 900
r.length = 0xffff
r.rdata = dns.txt_record()
r.rdata.txt = '"This is a test"'
updates.append(r)
p.nscount = len(updates)
p.nsrecs = updates
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
questions = []
name = "textrec.%s" % self.get_dns_domain()
q = self.make_name_question(name, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
questions.append(q)
self.finish_name_packet(p, questions)
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.assertEquals(response.ancount, 1)
self.assertEquals(response.answers[0].rdata.txt, '"This is a test"')
def test_update_add_two_txt_records(self):
"test adding two txt records works"
p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
updates = []
name = self.get_dns_domain()
u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
updates.append(u)
self.finish_name_packet(p, updates)
updates = []
r = dns.res_rec()
r.name = "textrec2.%s" % self.get_dns_domain()
r.rr_type = dns.DNS_QTYPE_TXT
r.rr_class = dns.DNS_QCLASS_IN
r.ttl = 900
r.length = 0xffff
r.rdata = dns.txt_record()
r.rdata.txt = '"This is a test" "and this is a test, too"'
updates.append(r)
p.nscount = len(updates)
p.nsrecs = updates
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
questions = []
name = "textrec2.%s" % self.get_dns_domain()
q = self.make_name_question(name, dns.DNS_QTYPE_TXT, dns.DNS_QCLASS_IN)
questions.append(q)
self.finish_name_packet(p, questions)
response = self.dns_transaction_udp(p)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.assertEquals(response.ancount, 1)
self.assertEquals(response.answers[0].rdata.txt, '"This is a test" "and this is a test, too"')
if __name__ == "__main__":
import unittest
unittest.main()