1
0
mirror of https://github.com/samba-team/samba.git synced 2025-12-17 04:23:50 +03:00

python:lsa_utils: Fix fallback to OpenPolicy2

BUG: https://bugzilla.samba.org/show_bug.cgi?id=15680

Pair-Programmed-With: Andreas Schneider <asn@samba.org>
Signed-off-by: Andreas Schneider <asn@samba.org>
Signed-off-by: Stefan Metzmacher <metze@samba.org>

Autobuild-User(master): Andreas Schneider <asn@cryptomilk.org>
Autobuild-Date(master): Mon Feb 17 18:33:15 UTC 2025 on atb-devel-224
This commit is contained in:
Stefan Metzmacher
2024-07-17 18:12:31 +02:00
committed by Andreas Schneider
parent f9a3fc19f1
commit a814f5d90a
4 changed files with 128 additions and 97 deletions

View File

@@ -20,24 +20,27 @@ from samba.dcerpc import lsa, drsblobs, misc
from samba.ndr import ndr_pack from samba.ndr import ndr_pack
from samba import ( from samba import (
NTSTATUSError, NTSTATUSError,
ntstatus,
aead_aes_256_cbc_hmac_sha512, aead_aes_256_cbc_hmac_sha512,
arcfour_encrypt, arcfour_encrypt,
) )
from samba.ntstatus import (
NT_STATUS_RPC_PROCNUM_OUT_OF_RANGE
)
from samba import crypto from samba import crypto
from secrets import token_bytes from secrets import token_bytes
# FIXME from collections.abc import Callable
def OpenPolicyFallback( def OpenPolicyFallback(
conn: lsa.lsarpc, # new_lsa_conn: Callable[[], lsa.lsarpc], - FIXME the type doesn't work
# with python version 3.6 (CentOS8, SLES15).
new_lsa_conn,
system_name: str, system_name: str,
in_version: int, in_version: int,
in_revision_info: lsa.revision_info1, in_revision_info: lsa.revision_info1,
sec_qos: bool, sec_qos: bool,
access_mask: int, access_mask: int,
): ):
conn = new_lsa_conn()
attr = lsa.ObjectAttribute() attr = lsa.ObjectAttribute()
if sec_qos: if sec_qos:
qos = lsa.QosInfo() qos = lsa.QosInfo()
@@ -48,26 +51,38 @@ def OpenPolicyFallback(
attr.sec_qos = qos attr.sec_qos = qos
try: open_policy2 = False
out_version, out_rev_info, policy = conn.OpenPolicy3( if in_revision_info is not None:
system_name, try:
attr, out_version, out_rev_info, policy = conn.OpenPolicy3(
access_mask, system_name,
in_version, attr,
in_revision_info access_mask,
) in_version,
except NTSTATUSError as e: in_revision_info
if e.args[0] == NT_STATUS_RPC_PROCNUM_OUT_OF_RANGE: )
out_version = 1 except NTSTATUSError as e:
out_rev_info = lsa.revision_info1() if e.args[0] == ntstatus.NT_STATUS_RPC_PROCNUM_OUT_OF_RANGE:
out_rev_info.revision = 1 open_policy2 = True
out_rev_info.supported_features = 0 if e.args[0] == ntstatus.NT_STATUS_ACCESS_DENIED:
# We need a new connection
conn = new_lsa_conn(basis_connection=conn)
policy = conn.OpenPolicy2(system_name, attr, access_mask) open_policy2 = True
else: else:
raise raise
else:
open_policy2 = True
return out_version, out_rev_info, policy if open_policy2:
out_version = 1
out_rev_info = lsa.revision_info1()
out_rev_info.revision = 1
out_rev_info.supported_features = 0
policy = conn.OpenPolicy2(system_name, attr, access_mask)
return conn, out_version, out_rev_info, policy
def CreateTrustedDomainRelax( def CreateTrustedDomainRelax(

View File

@@ -125,8 +125,13 @@ class DomainTrustCommand(Command):
self.local_creds = local_creds self.local_creds = local_creds
return self.local_server return self.local_server
def new_local_lsa_connection(self): def new_local_lsa_connection(self, basis_connection=None):
return lsa.lsarpc(self.local_binding_string, self.local_lp, self.local_creds) return lsa.lsarpc(
self.local_binding_string,
self.local_lp,
self.local_creds,
basis_connection=basis_connection
)
def new_local_netlogon_connection(self): def new_local_netlogon_connection(self):
return netlogon.netlogon(self.local_binding_string, self.local_lp, self.local_creds) return netlogon.netlogon(self.local_binding_string, self.local_lp, self.local_creds)
@@ -203,13 +208,23 @@ class DomainTrustCommand(Command):
self.remote_creds = remote_creds self.remote_creds = remote_creds
return self.remote_server return self.remote_server
def new_remote_lsa_connection(self): def new_remote_lsa_connection(self, basis_connection=None):
return lsa.lsarpc(self.remote_binding_string, self.local_lp, self.remote_creds) return lsa.lsarpc(
self.remote_binding_string,
self.local_lp,
self.remote_creds,
basis_connection=basis_connection
)
def new_remote_netlogon_connection(self): def new_remote_netlogon_connection(self, basis_connection=None):
return netlogon.netlogon(self.remote_binding_string, self.local_lp, self.remote_creds) return netlogon.netlogon(
self.remote_binding_string,
self.local_lp,
self.remote_creds,
basis_connection=basis_connection
)
def get_lsa_info(self, conn, policy_access): def get_lsa_info(self, conn_fn, policy_access):
in_version = 1 in_version = 1
in_revision_info1 = lsa.revision_info1() in_revision_info1 = lsa.revision_info1()
in_revision_info1.revision = 1 in_revision_info1.revision = 1
@@ -217,9 +232,9 @@ class DomainTrustCommand(Command):
lsa.LSA_FEATURE_TDO_AUTH_INFO_AES_CIPHER lsa.LSA_FEATURE_TDO_AUTH_INFO_AES_CIPHER
) )
out_version, out_revision_info1, policy = OpenPolicyFallback( conn, out_version, out_revision_info1, policy = OpenPolicyFallback(
conn, conn_fn,
b''.decode('utf-8'), '',
in_version, in_version,
in_revision_info1, in_revision_info1,
False, False,
@@ -228,7 +243,7 @@ class DomainTrustCommand(Command):
info = conn.QueryInfoPolicy2(policy, lsa.LSA_POLICY_INFO_DNS) info = conn.QueryInfoPolicy2(policy, lsa.LSA_POLICY_INFO_DNS)
return (policy, out_version, out_revision_info1, info) return (conn, policy, out_version, out_revision_info1, info)
def get_netlogon_dc_unc(self, conn, server, domain): def get_netlogon_dc_unc(self, conn, server, domain):
try: try:
@@ -508,19 +523,15 @@ class cmd_domain_trust_show(DomainTrustCommand):
def run(self, domain, sambaopts=None, versionopts=None, localdcopts=None): def run(self, domain, sambaopts=None, versionopts=None, localdcopts=None):
self.setup_local_server(sambaopts, localdcopts) self.setup_local_server(sambaopts, localdcopts)
try:
local_lsa = self.new_local_lsa_connection()
except RuntimeError as error:
raise self.LocalRuntimeError(self, error, "failed to connect lsa server")
try: try:
local_policy_access = lsa.LSA_POLICY_VIEW_LOCAL_INFORMATION local_policy_access = lsa.LSA_POLICY_VIEW_LOCAL_INFORMATION
( (
local_lsa,
local_policy, local_policy,
local_version, local_version,
local_revision_info1, local_revision_info1,
local_lsa_info local_lsa_info
) = self.get_lsa_info(local_lsa, local_policy_access) ) = self.get_lsa_info(self.new_local_lsa_connection, local_policy_access)
except RuntimeError as error: except RuntimeError as error:
raise self.LocalRuntimeError(self, error, "failed to query LSA_POLICY_INFO_DNS") raise self.LocalRuntimeError(self, error, "failed to query LSA_POLICY_INFO_DNS")
@@ -649,19 +660,16 @@ class cmd_domain_trust_modify(DomainTrustCommand):
raise CommandError("modification arguments are required, try --help") raise CommandError("modification arguments are required, try --help")
self.setup_local_server(sambaopts, localdcopts) self.setup_local_server(sambaopts, localdcopts)
try:
local_lsa = self.new_local_lsa_connection()
except RuntimeError as error:
raise self.LocalRuntimeError(self, error, "failed to connect to lsa server")
try: try:
local_policy_access = lsa.LSA_POLICY_VIEW_LOCAL_INFORMATION local_policy_access = lsa.LSA_POLICY_VIEW_LOCAL_INFORMATION
( (
local_lsa,
local_policy, local_policy,
local_version, local_version,
local_revision_info1, local_revision_info1,
local_lsa_info local_lsa_info
) = self.get_lsa_info(local_lsa, local_policy_access) ) = self.get_lsa_info(self.new_local_lsa_connection, local_policy_access)
except RuntimeError as error: except RuntimeError as error:
raise self.LocalRuntimeError(self, error, "failed to query LSA_POLICY_INFO_DNS") raise self.LocalRuntimeError(self, error, "failed to query LSA_POLICY_INFO_DNS")
@@ -908,18 +916,15 @@ class cmd_domain_trust_create(DomainTrustCommand):
remote_trust_info.trust_attributes |= lsa.LSA_TRUST_ATTRIBUTE_TREAT_AS_EXTERNAL remote_trust_info.trust_attributes |= lsa.LSA_TRUST_ATTRIBUTE_TREAT_AS_EXTERNAL
local_server = self.setup_local_server(sambaopts, localdcopts) local_server = self.setup_local_server(sambaopts, localdcopts)
try:
local_lsa = self.new_local_lsa_connection()
except RuntimeError as error:
raise self.LocalRuntimeError(self, error, "failed to connect lsa server")
try: try:
( (
local_lsa,
local_policy, local_policy,
local_version, local_version,
local_revision_info1, local_revision_info1,
local_lsa_info local_lsa_info
) = self.get_lsa_info(local_lsa, local_policy_access) ) = self.get_lsa_info(self.new_local_lsa_connection, local_policy_access)
except RuntimeError as error: except RuntimeError as error:
raise self.LocalRuntimeError(self, error, "failed to query LSA_POLICY_INFO_DNS") raise self.LocalRuntimeError(self, error, "failed to query LSA_POLICY_INFO_DNS")
@@ -933,18 +938,14 @@ class cmd_domain_trust_create(DomainTrustCommand):
except RuntimeError as error: except RuntimeError as error:
raise self.RemoteRuntimeError(self, error, "failed to locate remote server") raise self.RemoteRuntimeError(self, error, "failed to locate remote server")
try:
remote_lsa = self.new_remote_lsa_connection()
except RuntimeError as error:
raise self.RemoteRuntimeError(self, error, "failed to connect lsa server")
try: try:
( (
remote_lsa,
remote_policy, remote_policy,
remote_version, remote_version,
remote_revision_info1, remote_revision_info1,
remote_lsa_info remote_lsa_info
) = self.get_lsa_info(remote_lsa, remote_policy_access) ) = self.get_lsa_info(self.new_remote_lsa_connection, remote_policy_access)
except RuntimeError as error: except RuntimeError as error:
raise self.RemoteRuntimeError(self, error, "failed to query LSA_POLICY_INFO_DNS") raise self.RemoteRuntimeError(self, error, "failed to query LSA_POLICY_INFO_DNS")
@@ -1297,18 +1298,15 @@ class cmd_domain_trust_delete(DomainTrustCommand):
remote_policy_access |= lsa.LSA_POLICY_CREATE_SECRET remote_policy_access |= lsa.LSA_POLICY_CREATE_SECRET
self.setup_local_server(sambaopts, localdcopts) self.setup_local_server(sambaopts, localdcopts)
try:
local_lsa = self.new_local_lsa_connection()
except RuntimeError as error:
raise self.LocalRuntimeError(self, error, "failed to connect lsa server")
try: try:
( (
local_lsa,
local_policy, local_policy,
local_version, local_version,
local_revision_info1, local_revision_info1,
local_lsa_info local_lsa_info
) = self.get_lsa_info(local_lsa, local_policy_access) ) = self.get_lsa_info(self.new_local_lsa_connection, local_policy_access)
except RuntimeError as error: except RuntimeError as error:
raise self.LocalRuntimeError(self, error, "failed to query LSA_POLICY_INFO_DNS") raise self.LocalRuntimeError(self, error, "failed to query LSA_POLICY_INFO_DNS")
@@ -1338,18 +1336,14 @@ class cmd_domain_trust_delete(DomainTrustCommand):
except RuntimeError as error: except RuntimeError as error:
raise self.RemoteRuntimeError(self, error, "failed to locate remote server") raise self.RemoteRuntimeError(self, error, "failed to locate remote server")
try:
remote_lsa = self.new_remote_lsa_connection()
except RuntimeError as error:
raise self.RemoteRuntimeError(self, error, "failed to connect lsa server")
try: try:
( (
remote_lsa,
remote_policy, remote_policy,
remote_version, remote_version,
remote_revision_info1, remote_revision_info1,
remote_lsa_info remote_lsa_info
) = self.get_lsa_info(remote_lsa, remote_policy_access) ) = self.get_lsa_info(self.new_remote_lsa_connection, remote_policy_access)
except RuntimeError as error: except RuntimeError as error:
raise self.RemoteRuntimeError(self, error, "failed to query LSA_POLICY_INFO_DNS") raise self.RemoteRuntimeError(self, error, "failed to query LSA_POLICY_INFO_DNS")
@@ -1450,18 +1444,15 @@ class cmd_domain_trust_validate(DomainTrustCommand):
local_policy_access = lsa.LSA_POLICY_VIEW_LOCAL_INFORMATION local_policy_access = lsa.LSA_POLICY_VIEW_LOCAL_INFORMATION
local_server = self.setup_local_server(sambaopts, localdcopts) local_server = self.setup_local_server(sambaopts, localdcopts)
try:
local_lsa = self.new_local_lsa_connection()
except RuntimeError as error:
raise self.LocalRuntimeError(self, error, "failed to connect lsa server")
try: try:
( (
local_lsa,
local_policy, local_policy,
local_version, local_version,
local_revision_info1, local_revision_info1,
local_lsa_info local_lsa_info
) = self.get_lsa_info(local_lsa, local_policy_access) ) = self.get_lsa_info(self.new_local_lsa_connection, local_policy_access)
except RuntimeError as error: except RuntimeError as error:
raise self.LocalRuntimeError(self, error, "failed to query LSA_POLICY_INFO_DNS") raise self.LocalRuntimeError(self, error, "failed to query LSA_POLICY_INFO_DNS")
@@ -1897,11 +1888,12 @@ class cmd_domain_trust_namespaces(DomainTrustCommand):
try: try:
( (
local_lsa,
local_policy, local_policy,
local_version, local_version,
local_revision_info1, local_revision_info1,
local_lsa_info local_lsa_info
) = self.get_lsa_info(local_lsa, local_policy_access) ) = self.get_lsa_info(self.new_local_lsa_connection, local_policy_access)
except RuntimeError as error: except RuntimeError as error:
raise self.LocalRuntimeError(self, error, "failed to query LSA_POLICY_INFO_DNS") raise self.LocalRuntimeError(self, error, "failed to query LSA_POLICY_INFO_DNS")

View File

@@ -35,6 +35,7 @@ from samba.lsa_utils import (
class CreateTrustedDomain(TestCase): class CreateTrustedDomain(TestCase):
smbencrypt = True
def get_user_creds(self): def get_user_creds(self):
c = Credentials() c = Credentials()
@@ -47,26 +48,35 @@ class CreateTrustedDomain(TestCase):
c.set_password(password) c.set_password(password)
return c return c
def _create_trust_relax(self, smbencrypt=True): def new_lsa_conn(self, basis_connection=None):
creds = self.get_user_creds() creds = self.get_user_creds()
if self.smbencrypt:
if smbencrypt:
creds.set_smb_encryption(SMB_ENCRYPTION_REQUIRED) creds.set_smb_encryption(SMB_ENCRYPTION_REQUIRED)
else: else:
creds.set_smb_encryption(SMB_ENCRYPTION_OFF) creds.set_smb_encryption(SMB_ENCRYPTION_OFF)
lp = self.get_loadparm() lp = self.get_loadparm()
binding_string = ( binding_string = (
"ncacn_np:%s" % (samba.tests.env_get_var_value('SERVER')) "ncacn_np:%s" % (samba.tests.env_get_var_value('SERVER'))
) )
lsa_conn = lsa.lsarpc(binding_string, lp, creds)
if smbencrypt: lsa_conn = lsa.lsarpc(
binding_string,
lp,
creds,
basis_connection=basis_connection
)
if self.smbencrypt:
self.assertTrue(lsa_conn.transport_encrypted()) self.assertTrue(lsa_conn.transport_encrypted())
else: else:
self.assertFalse(lsa_conn.transport_encrypted()) self.assertFalse(lsa_conn.transport_encrypted())
return lsa_conn
def _create_trust_relax(self, smbencrypt=True):
self.smbencrypt = smbencrypt
in_version = 1 in_version = 1
in_revision_info1 = lsa.revision_info1() in_revision_info1 = lsa.revision_info1()
in_revision_info1.revision = 1 in_revision_info1.revision = 1
@@ -74,8 +84,13 @@ class CreateTrustedDomain(TestCase):
lsa.LSA_FEATURE_TDO_AUTH_INFO_AES_CIPHER lsa.LSA_FEATURE_TDO_AUTH_INFO_AES_CIPHER
) )
out_version, out_revision_info1, pol_handle = OpenPolicyFallback( (
lsa_conn, lsa_conn,
out_version,
out_revision_info1,
pol_handle
) = OpenPolicyFallback(
self.new_lsa_conn,
'', '',
in_version, in_version,
in_revision_info1, in_revision_info1,
@@ -148,14 +163,7 @@ class CreateTrustedDomain(TestCase):
self.assertIsNone(trustdom_handle) self.assertIsNone(trustdom_handle)
def _create_trust_fallback(self): def _create_trust_fallback(self):
creds = self.get_user_creds() self.smbencrypt = True
lp = self.get_loadparm()
binding_string = (
"ncacn_np:%s" % (samba.tests.env_get_var_value('SERVER'))
)
lsa_conn = lsa.lsarpc(binding_string, lp, creds)
in_version = 1 in_version = 1
in_revision_info1 = lsa.revision_info1() in_revision_info1 = lsa.revision_info1()
@@ -164,8 +172,13 @@ class CreateTrustedDomain(TestCase):
lsa.LSA_FEATURE_TDO_AUTH_INFO_AES_CIPHER lsa.LSA_FEATURE_TDO_AUTH_INFO_AES_CIPHER
) )
out_version, out_revision_info1, pol_handle = OpenPolicyFallback( (
lsa_conn, lsa_conn,
out_version,
out_revision_info1,
pol_handle
) = OpenPolicyFallback(
self.new_lsa_conn,
'', '',
in_version, in_version,
in_revision_info1, in_revision_info1,

View File

@@ -57,7 +57,6 @@ from samba.crypto import des_crypt_blob_16, md4_hash_blob
from samba.lsa_utils import OpenPolicyFallback, CreateTrustedDomainFallback from samba.lsa_utils import OpenPolicyFallback, CreateTrustedDomainFallback
from samba.dcerpc import ( from samba.dcerpc import (
claims, claims,
dcerpc,
drsblobs, drsblobs,
drsuapi, drsuapi,
krb5ccache, krb5ccache,
@@ -441,7 +440,7 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
return self._drsuapi_connection return self._drsuapi_connection
def get_lsarpc_connection(self): def get_lsarpc_connection(self):
def get_lsa_info(conn, policy_access): def get_lsa_info(conn_fn, policy_access):
in_version = 1 in_version = 1
in_revision_info1 = lsa.revision_info1() in_revision_info1 = lsa.revision_info1()
in_revision_info1.revision = 1 in_revision_info1.revision = 1
@@ -449,9 +448,9 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
lsa.LSA_FEATURE_TDO_AUTH_INFO_AES_CIPHER lsa.LSA_FEATURE_TDO_AUTH_INFO_AES_CIPHER
) )
out_version, out_revision_info1, policy = OpenPolicyFallback( conn, out_version, out_revision_info1, policy = OpenPolicyFallback(
conn, conn_fn,
b''.decode('utf-8'), '',
in_version, in_version,
in_revision_info1, in_revision_info1,
False, False,
@@ -460,7 +459,18 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
info = conn.QueryInfoPolicy2(policy, lsa.LSA_POLICY_INFO_DNS) info = conn.QueryInfoPolicy2(policy, lsa.LSA_POLICY_INFO_DNS)
return (policy, out_version, out_revision_info1, info) return (conn, policy, out_version, out_revision_info1, info)
def new_lsa_conn(basis_connection=None):
lp = self.get_lp()
admin_creds = self.get_admin_creds()
return lsa.lsarpc(
self._binding_string,
lp,
admin_creds,
basis_connection=basis_connection
)
def lsarpc_connect(server, lp, creds, ip=None): def lsarpc_connect(server, lp, creds, ip=None):
binding_options = "" binding_options = ""
@@ -474,13 +484,14 @@ class KDCBaseTest(TestCaseInTempDir, RawKerberosTest):
else: else:
binding_string = "ncacn_np:%s[%s]" % (server, binding_options) binding_string = "ncacn_np:%s[%s]" % (server, binding_options)
self._binding_string = binding_string
try: try:
conn = lsa.lsarpc(binding_string, lp, creds)
policy_access = lsa.LSA_POLICY_VIEW_LOCAL_INFORMATION policy_access = lsa.LSA_POLICY_VIEW_LOCAL_INFORMATION
policy_access |= lsa.LSA_POLICY_TRUST_ADMIN policy_access |= lsa.LSA_POLICY_TRUST_ADMIN
policy_access |= lsa.LSA_POLICY_CREATE_SECRET policy_access |= lsa.LSA_POLICY_CREATE_SECRET
(policy, out_version, out_revision_info1, info) = \ (conn, policy, out_version, out_revision_info1, info) = \
get_lsa_info(conn, policy_access) get_lsa_info(new_lsa_conn, policy_access)
except Exception as e: except Exception as e:
raise RuntimeError("LSARPC connection to %s failed: %s" % (server, e)) raise RuntimeError("LSARPC connection to %s failed: %s" % (server, e))