1
0
mirror of https://github.com/samba-team/samba.git synced 2025-03-09 08:58:35 +03:00

CVE-2022-32743 tests/py_credentials: Add tests for setting dNSHostName with LogonGetDomainInfo()

Test that the value is properly validated, and that it can be set
regardless of rights on the account.

BUG: https://bugzilla.samba.org/show_bug.cgi?id=14833

Signed-off-by: Joseph Sutton <josephsutton@catalyst.net.nz>
Reviewed-by: Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
This commit is contained in:
Joseph Sutton 2022-06-07 17:35:35 +12:00 committed by Douglas Bagnall
parent d277700710
commit b41691d0e5
2 changed files with 281 additions and 2 deletions

View File

@ -18,6 +18,8 @@
from samba.tests import TestCase, delete_force
import os
import ldb
import samba
from samba.auth import system_session
from samba.credentials import (
@ -25,7 +27,7 @@ from samba.credentials import (
CLI_CRED_NTLMv2_AUTH,
CLI_CRED_NTLM_AUTH,
DONT_USE_KERBEROS)
from samba.dcerpc import netlogon, ntlmssp, srvsvc
from samba.dcerpc import lsa, netlogon, ntlmssp, security, srvsvc
from samba.dcerpc.netlogon import (
netr_Authenticator,
netr_WorkstationInformation,
@ -36,10 +38,11 @@ from samba.dsdb import (
UF_WORKSTATION_TRUST_ACCOUNT,
UF_PASSWD_NOTREQD,
UF_NORMAL_ACCOUNT)
from samba.ndr import ndr_pack
from samba.ndr import ndr_pack, ndr_unpack
from samba.samdb import SamDB
from samba import NTSTATUSError, ntstatus
from samba.common import get_string
from samba.sd_utils import SDUtils
import ctypes
@ -105,6 +108,280 @@ class PyCredentialsTests(TestCase):
(authenticator, subsequent) = self.get_authenticator(c)
self.do_NetrLogonGetDomainInfo(c, authenticator, subsequent)
# Test using LogonGetDomainInfo to update dNSHostName to an allowed value.
def test_set_dns_hostname_valid(self):
c = self.get_netlogon_connection()
authenticator, subsequent = self.get_authenticator(c)
domain_hostname = self.ldb.domain_dns_name()
new_dns_hostname = f'{self.machine_name}.{domain_hostname}'
new_dns_hostname = new_dns_hostname.encode('utf-8')
query = netr_WorkstationInformation()
query.os_name = lsa.String('some OS')
query.dns_hostname = new_dns_hostname
c.netr_LogonGetDomainInfo(
server_name=self.server,
computer_name=self.user_creds.get_workstation(),
credential=authenticator,
return_authenticator=subsequent,
level=1,
query=query)
# Check the result.
res = self.ldb.search(self.machine_dn,
scope=ldb.SCOPE_BASE,
attrs=['dNSHostName'])
self.assertEqual(1, len(res))
got_dns_hostname = res[0].get('dNSHostName', idx=0)
self.assertEqual(new_dns_hostname, got_dns_hostname)
# Test using LogonGetDomainInfo to update dNSHostName to an allowed value,
# when we are denied the right to do so.
def test_set_dns_hostname_valid_denied(self):
c = self.get_netlogon_connection()
authenticator, subsequent = self.get_authenticator(c)
res = self.ldb.search(self.machine_dn,
scope=ldb.SCOPE_BASE,
attrs=['objectSid'])
self.assertEqual(1, len(res))
machine_sid = ndr_unpack(security.dom_sid,
res[0].get('objectSid', idx=0))
sd_utils = SDUtils(self.ldb)
# Deny Validated Write and Write Property.
mod = (f'(OD;;SWWP;{security.GUID_DRS_DNS_HOST_NAME};;'
f'{machine_sid})')
sd_utils.dacl_add_ace(self.machine_dn, mod)
domain_hostname = self.ldb.domain_dns_name()
new_dns_hostname = f'{self.machine_name}.{domain_hostname}'
new_dns_hostname = new_dns_hostname.encode('utf-8')
query = netr_WorkstationInformation()
query.os_name = lsa.String('some OS')
query.dns_hostname = new_dns_hostname
c.netr_LogonGetDomainInfo(
server_name=self.server,
computer_name=self.user_creds.get_workstation(),
credential=authenticator,
return_authenticator=subsequent,
level=1,
query=query)
# Check the result.
res = self.ldb.search(self.machine_dn,
scope=ldb.SCOPE_BASE,
attrs=['dNSHostName'])
self.assertEqual(1, len(res))
got_dns_hostname = res[0].get('dNSHostName', idx=0)
self.assertEqual(new_dns_hostname, got_dns_hostname)
# Ensure we can't use LogonGetDomainInfo to update dNSHostName to an
# invalid value, even with Validated Write.
def test_set_dns_hostname_invalid_validated_write(self):
c = self.get_netlogon_connection()
authenticator, subsequent = self.get_authenticator(c)
res = self.ldb.search(self.machine_dn,
scope=ldb.SCOPE_BASE,
attrs=['objectSid'])
self.assertEqual(1, len(res))
machine_sid = ndr_unpack(security.dom_sid,
res[0].get('objectSid', idx=0))
sd_utils = SDUtils(self.ldb)
# Grant Validated Write.
mod = (f'(OA;;SW;{security.GUID_DRS_DNS_HOST_NAME};;'
f'{machine_sid})')
sd_utils.dacl_add_ace(self.machine_dn, mod)
new_dns_hostname = b'invalid'
query = netr_WorkstationInformation()
query.os_name = lsa.String('some OS')
query.dns_hostname = new_dns_hostname
c.netr_LogonGetDomainInfo(
server_name=self.server,
computer_name=self.user_creds.get_workstation(),
credential=authenticator,
return_authenticator=subsequent,
level=1,
query=query)
# Check the result.
res = self.ldb.search(self.machine_dn,
scope=ldb.SCOPE_BASE,
attrs=['dNSHostName'])
self.assertEqual(1, len(res))
got_dns_hostname = res[0].get('dNSHostName', idx=0)
self.assertIsNone(got_dns_hostname)
# Ensure we can't use LogonGetDomainInfo to update dNSHostName to an
# invalid value, even with Write Property.
def test_set_dns_hostname_invalid_write_property(self):
c = self.get_netlogon_connection()
authenticator, subsequent = self.get_authenticator(c)
res = self.ldb.search(self.machine_dn,
scope=ldb.SCOPE_BASE,
attrs=['objectSid'])
self.assertEqual(1, len(res))
machine_sid = ndr_unpack(security.dom_sid,
res[0].get('objectSid', idx=0))
sd_utils = SDUtils(self.ldb)
# Grant Write Property.
mod = (f'(OA;;WP;{security.GUID_DRS_DNS_HOST_NAME};;'
f'{machine_sid})')
sd_utils.dacl_add_ace(self.machine_dn, mod)
new_dns_hostname = b'invalid'
query = netr_WorkstationInformation()
query.os_name = lsa.String('some OS')
query.dns_hostname = new_dns_hostname
c.netr_LogonGetDomainInfo(
server_name=self.server,
computer_name=self.user_creds.get_workstation(),
credential=authenticator,
return_authenticator=subsequent,
level=1,
query=query)
# Check the result.
res = self.ldb.search(self.machine_dn,
scope=ldb.SCOPE_BASE,
attrs=['dNSHostName'])
self.assertEqual(1, len(res))
got_dns_hostname = res[0].get('dNSHostName', idx=0)
self.assertIsNone(got_dns_hostname)
# Show we can't use LogonGetDomainInfo to set the dNSHostName to just the
# machine name.
def test_set_dns_hostname_to_machine_name(self):
c = self.get_netlogon_connection()
authenticator, subsequent = self.get_authenticator(c)
new_dns_hostname = self.machine_name.encode('utf-8')
query = netr_WorkstationInformation()
query.os_name = lsa.String('some OS')
query.dns_hostname = new_dns_hostname
c.netr_LogonGetDomainInfo(
server_name=self.server,
computer_name=self.user_creds.get_workstation(),
credential=authenticator,
return_authenticator=subsequent,
level=1,
query=query)
# Check the result.
res = self.ldb.search(self.machine_dn,
scope=ldb.SCOPE_BASE,
attrs=['dNSHostName'])
self.assertEqual(1, len(res))
got_dns_hostname = res[0].get('dNSHostName', idx=0)
self.assertIsNone(got_dns_hostname)
# Show we can't use LogonGetDomainInfo to set dNSHostName with an invalid
# suffix.
def test_set_dns_hostname_invalid_suffix(self):
c = self.get_netlogon_connection()
authenticator, subsequent = self.get_authenticator(c)
domain_hostname = self.ldb.domain_dns_name()
new_dns_hostname = f'{self.machine_name}.foo.{domain_hostname}'
new_dns_hostname = new_dns_hostname.encode('utf-8')
query = netr_WorkstationInformation()
query.os_name = lsa.String('some OS')
query.dns_hostname = new_dns_hostname
c.netr_LogonGetDomainInfo(
server_name=self.server,
computer_name=self.user_creds.get_workstation(),
credential=authenticator,
return_authenticator=subsequent,
level=1,
query=query)
# Check the result.
res = self.ldb.search(self.machine_dn,
scope=ldb.SCOPE_BASE,
attrs=['dNSHostName'])
self.assertEqual(1, len(res))
got_dns_hostname = res[0].get('dNSHostName', idx=0)
self.assertIsNone(got_dns_hostname)
# Test that setting the HANDLES_SPN_UPDATE flag inhibits the dNSHostName
# update, but other attributes are still updated.
def test_set_dns_hostname_with_flag(self):
c = self.get_netlogon_connection()
authenticator, subsequent = self.get_authenticator(c)
domain_hostname = self.ldb.domain_dns_name()
new_dns_hostname = f'{self.machine_name}.{domain_hostname}'
new_dns_hostname = new_dns_hostname.encode('utf-8')
operating_system = 'some OS'
query = netr_WorkstationInformation()
query.os_name = lsa.String(operating_system)
query.dns_hostname = new_dns_hostname
query.workstation_flags = netlogon.NETR_WS_FLAG_HANDLES_SPN_UPDATE
c.netr_LogonGetDomainInfo(
server_name=self.server,
computer_name=self.user_creds.get_workstation(),
credential=authenticator,
return_authenticator=subsequent,
level=1,
query=query)
# Check the result.
res = self.ldb.search(self.machine_dn,
scope=ldb.SCOPE_BASE,
attrs=['dNSHostName',
'operatingSystem'])
self.assertEqual(1, len(res))
got_dns_hostname = res[0].get('dNSHostName', idx=0)
self.assertIsNone(got_dns_hostname)
got_os = res[0].get('operatingSystem', idx=0)
self.assertEqual(operating_system.encode('utf-8'), got_os)
def test_SamLogonEx(self):
c = self.get_netlogon_connection()

View File

@ -0,0 +1,2 @@
^samba.tests.py_credentials.samba.tests.py_credentials.PyCredentialsTests.test_set_dns_hostname_invalid_suffix\(
^samba.tests.py_credentials.samba.tests.py_credentials.PyCredentialsTests.test_set_dns_hostname_with_flag\(