1
0
mirror of https://github.com/samba-team/samba.git synced 2025-03-20 22:50:26 +03:00

CVE-2020-25719 CVE-2020-25717 tests/krb5: Add tests for connecting to services anonymously and without a PAC

At the end of the patchset we assume NT_STATUS_NO_IMPERSONATION_TOKEN if
no PAC is available.

For now we want to look for ACCESS_DENIED as this allows
the test to pass (showing that gensec:require_pac = true
is a useful partial mitigation).

This will also help others doing backports that do not
take the full patch set.

BUG: https://bugzilla.samba.org/show_bug.cgi?id=14801
BUG: https://bugzilla.samba.org/show_bug.cgi?id=14799
BUG: https://bugzilla.samba.org/show_bug.cgi?id=14561
BUG: https://bugzilla.samba.org/show_bug.cgi?id=14556

Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
This commit is contained in:
Stefan Metzmacher 2021-08-24 17:11:24 +02:00 committed by Jule Anger
parent 6dda0f61bb
commit e31b6f6094
5 changed files with 158 additions and 37 deletions

View File

@ -21,10 +21,11 @@ import sys
import os
from ldb import SCOPE_SUBTREE
from samba import gensec
from samba import NTSTATUSError, gensec
from samba.auth import AuthContext
from samba.dcerpc import security
from samba.ndr import ndr_unpack
from samba.ntstatus import NT_STATUS_ACCESS_DENIED
from samba.tests.krb5.kdc_base_test import KDCBaseTest
@ -41,11 +42,18 @@ class CcacheTests(KDCBaseTest):
"""
def test_ccache(self):
self._run_ccache_test("ccacheusr")
def test_ccache_no_pac(self):
self._run_ccache_test("ccacheusr_nopac", include_pac=False,
expect_anon=True, allow_error=True)
def _run_ccache_test(self, user_name, include_pac=True,
expect_anon=False, allow_error=False):
# Create a user account and a machine account, along with a Kerberos
# credentials cache file where the service ticket authenticating the
# user are stored.
user_name = "ccacheusr"
mach_name = "ccachemac"
service = "host"
@ -67,7 +75,10 @@ class CcacheTests(KDCBaseTest):
# ticket, to ensure that the krbtgt ticket doesn't also need to be
# stored.
(creds, cachefile) = self.create_ccache_with_user(user_credentials,
mach_credentials)
mach_credentials,
pac=include_pac)
# Remove the cached credentials file.
self.addCleanup(os.remove, cachefile.name)
# Authenticate in-process to the machine account using the user's
# cached credentials.
@ -117,7 +128,16 @@ class CcacheTests(KDCBaseTest):
sid = ndr_unpack(security.dom_sid, ldb_res[0]["objectSid"][0])
# Retrieve the SIDs from the security token.
session = gensec_server.session_info()
try:
session = gensec_server.session_info()
except NTSTATUSError as e:
if not allow_error:
self.fail()
enum, _ = e.args
self.assertEqual(NT_STATUS_ACCESS_DENIED, enum)
return
token = session.security_token
token_sids = token.sids
self.assertGreater(len(token_sids), 0)
@ -125,9 +145,6 @@ class CcacheTests(KDCBaseTest):
# Ensure that they match.
self.assertEqual(sid, token_sids[0])
# Remove the cached credentials file.
os.remove(cachefile.name)
if __name__ == "__main__":
global_asn1_print = False

View File

@ -20,10 +20,11 @@
import sys
import os
from ldb import SCOPE_BASE, SCOPE_SUBTREE
from ldb import LdbError, ERR_OPERATIONS_ERROR, SCOPE_BASE, SCOPE_SUBTREE
from samba.dcerpc import security
from samba.ndr import ndr_unpack
from samba.samdb import SamDB
from samba import credentials
from samba.tests.krb5.kdc_base_test import KDCBaseTest
@ -40,13 +41,20 @@ class LdapTests(KDCBaseTest):
"""
def test_ldap(self):
self._run_ldap_test("ldapusr")
def test_ldap_no_pac(self):
self._run_ldap_test("ldapusr_nopac", include_pac=False,
expect_anon=True, allow_error=True)
def _run_ldap_test(self, user_name, include_pac=True,
expect_anon=False, allow_error=False):
# Create a user account and a machine account, along with a Kerberos
# credentials cache file where the service ticket authenticating the
# user are stored.
samdb = self.get_samdb()
user_name = "ldapusr"
mach_name = samdb.host_dns_name()
service = "ldap"
@ -62,7 +70,10 @@ class LdapTests(KDCBaseTest):
(creds, cachefile) = self.create_ccache_with_user(user_credentials,
mach_credentials,
service,
mach_name)
mach_name,
pac=include_pac)
# Remove the cached credentials file.
self.addCleanup(os.remove, cachefile.name)
# Authenticate in-process to the machine account using the user's
# cached credentials.
@ -74,22 +85,61 @@ class LdapTests(KDCBaseTest):
self.assertEqual(1, len(ldb_res))
sid = ndr_unpack(security.dom_sid, ldb_res[0]["objectSid"][0])
# Connect to the machine account and retrieve the user SID.
try:
ldb_as_user = SamDB(url="ldap://%s" % mach_name,
credentials=creds,
lp=self.get_lp())
except LdbError as e:
if not allow_error:
self.fail()
enum, estr = e.args
self.assertEqual(ERR_OPERATIONS_ERROR, enum)
self.assertIn('NT_STATUS_ACCESS_DENIED', estr)
return
ldb_res = ldb_as_user.search('',
scope=SCOPE_BASE,
attrs=["tokenGroups"])
self.assertEqual(1, len(ldb_res))
token_groups = ldb_res[0]["tokenGroups"]
token_sid = ndr_unpack(security.dom_sid, token_groups[0])
if expect_anon:
# Ensure we got an anonymous token.
self.assertEqual(security.SID_NT_ANONYMOUS, str(token_sid))
token_sid = ndr_unpack(security.dom_sid, token_groups[1])
self.assertEqual(security.SID_NT_NETWORK, str(token_sid))
if len(token_groups) >= 3:
token_sid = ndr_unpack(security.dom_sid, token_groups[2])
self.assertEqual(security.SID_NT_THIS_ORGANISATION,
str(token_sid))
else:
# Ensure that they match.
self.assertEqual(sid, token_sid)
def test_ldap_anonymous(self):
samdb = self.get_samdb()
mach_name = samdb.host_dns_name()
anon_creds = credentials.Credentials()
anon_creds.set_anonymous()
# Connect to the machine account and retrieve the user SID.
ldb_as_user = SamDB(url="ldap://%s" % mach_name,
credentials=creds,
credentials=anon_creds,
lp=self.get_lp())
ldb_res = ldb_as_user.search('',
scope=SCOPE_BASE,
attrs=["tokenGroups"])
self.assertEqual(1, len(ldb_res))
# Ensure we got an anonymous token.
token_sid = ndr_unpack(security.dom_sid, ldb_res[0]["tokenGroups"][0])
# Ensure that they match.
self.assertEqual(sid, token_sid)
# Remove the cached credentials file.
os.remove(cachefile.name)
self.assertEqual(security.SID_NT_ANONYMOUS, str(token_sid))
self.assertEqual(len(ldb_res[0]["tokenGroups"]), 1)
if __name__ == "__main__":

View File

@ -20,7 +20,9 @@
import sys
import os
from samba import NTSTATUSError, credentials
from samba.dcerpc import lsa
from samba.ntstatus import NT_STATUS_ACCESS_DENIED
from samba.tests.krb5.kdc_base_test import KDCBaseTest
@ -37,13 +39,20 @@ class RpcTests(KDCBaseTest):
"""
def test_rpc(self):
self._run_rpc_test("rpcusr")
def test_rpc_no_pac(self):
self._run_rpc_test("rpcusr_nopac", include_pac=False,
expect_anon=True, allow_error=True)
def _run_rpc_test(self, user_name, include_pac=True,
expect_anon=False, allow_error=False):
# Create a user account and a machine account, along with a Kerberos
# credentials cache file where the service ticket authenticating the
# user are stored.
samdb = self.get_samdb()
user_name = "rpcusr"
mach_name = samdb.host_dns_name()
service = "cifs"
@ -59,20 +68,45 @@ class RpcTests(KDCBaseTest):
(creds, cachefile) = self.create_ccache_with_user(user_credentials,
mach_credentials,
service,
mach_name)
mach_name,
pac=include_pac)
# Remove the cached credentials file.
self.addCleanup(os.remove, cachefile.name)
# Authenticate in-process to the machine account using the user's
# cached credentials.
binding_str = "ncacn_np:%s[\\pipe\\lsarpc]" % mach_name
conn = lsa.lsarpc(binding_str, self.get_lp(), creds)
try:
conn = lsa.lsarpc(binding_str, self.get_lp(), creds)
except NTSTATUSError as e:
if not allow_error:
self.fail()
enum, _ = e.args
self.assertEqual(NT_STATUS_ACCESS_DENIED, enum)
return
(account_name, _) = conn.GetUserName(None, None, None)
self.assertEqual(user_name, account_name.string)
if expect_anon:
self.assertNotEqual(user_name, account_name.string)
else:
self.assertEqual(user_name, account_name.string)
# Remove the cached credentials file.
os.remove(cachefile.name)
def test_rpc_anonymous(self):
samdb = self.get_samdb()
mach_name = samdb.host_dns_name()
anon_creds = credentials.Credentials()
anon_creds.set_anonymous()
binding_str = "ncacn_np:%s[\\pipe\\lsarpc]" % mach_name
conn = lsa.lsarpc(binding_str, self.get_lp(), anon_creds)
(account_name, _) = conn.GetUserName(None, None, None)
self.assertEqual('ANONYMOUS LOGON', account_name.string)
if __name__ == "__main__":

View File

@ -21,8 +21,10 @@ import sys
import os
from ldb import SCOPE_SUBTREE
from samba import NTSTATUSError
from samba.dcerpc import security
from samba.ndr import ndr_unpack
from samba.ntstatus import NT_STATUS_ACCESS_DENIED
from samba.samba3 import libsmb_samba_internal as libsmb
from samba.samba3 import param as s3param
@ -41,13 +43,20 @@ class SmbTests(KDCBaseTest):
"""
def test_smb(self):
self._run_smb_test("smbusr")
def test_smb_no_pac(self):
self._run_smb_test("smbusr_nopac", include_pac=False,
expect_error=True)
def _run_smb_test(self, user_name, include_pac=True,
expect_error=False):
# Create a user account and a machine account, along with a Kerberos
# credentials cache file where the service ticket authenticating the
# user are stored.
samdb = self.get_samdb()
user_name = "smbusr"
mach_name = samdb.host_dns_name()
service = "cifs"
share = "tmp"
@ -64,7 +73,10 @@ class SmbTests(KDCBaseTest):
(creds, cachefile) = self.create_ccache_with_user(user_credentials,
mach_credentials,
service,
mach_name)
mach_name,
pac=include_pac)
# Remove the cached credentials file.
self.addCleanup(os.remove, cachefile.name)
# Set the Kerberos 5 credentials cache environment variable. This is
# required because the codepath that gets run (gse_krb5) looks for it
@ -95,16 +107,23 @@ class SmbTests(KDCBaseTest):
self.addCleanup(s3_lp.set, "client max protocol", max_protocol)
s3_lp.set("client max protocol", "NT1")
conn = libsmb.Conn(mach_name, share, lp=s3_lp, creds=creds)
try:
conn = libsmb.Conn(mach_name, share, lp=s3_lp, creds=creds)
except NTSTATUSError as e:
if not expect_error:
self.fail()
enum, _ = e.args
self.assertEqual(NT_STATUS_ACCESS_DENIED, enum)
return
else:
self.assertFalse(expect_error)
(uid, gid, gids, sids, guest) = conn.posix_whoami()
# Ensure that they match.
self.assertEqual(sid, sids[0])
# Remove the cached credentials file.
os.remove(cachefile.name)
if __name__ == "__main__":
global_asn1_print = False

View File

@ -850,14 +850,15 @@ planoldpythontestsuite("ad_dc_default", "samba.tests.krb5.test_ldap",
'FAST_SUPPORT': have_fast_support,
'TKT_SIG_SUPPORT': tkt_sig_support
})
planoldpythontestsuite("ad_dc_default", "samba.tests.krb5.test_rpc",
environ={
'ADMIN_USERNAME': '$USERNAME',
'ADMIN_PASSWORD': '$PASSWORD',
'STRICT_CHECKING': '0',
'FAST_SUPPORT': have_fast_support,
'TKT_SIG_SUPPORT': tkt_sig_support
})
for env in ['ad_dc_default', 'ad_member']:
planoldpythontestsuite(env, "samba.tests.krb5.test_rpc",
environ={
'ADMIN_USERNAME': '$DC_USERNAME',
'ADMIN_PASSWORD': '$DC_PASSWORD',
'STRICT_CHECKING': '0',
'FAST_SUPPORT': have_fast_support,
'TKT_SIG_SUPPORT': tkt_sig_support
})
planoldpythontestsuite("ad_dc_smb1", "samba.tests.krb5.test_smb",
environ={
'ADMIN_USERNAME': '$USERNAME',