diff --git a/python/samba/lsa_utils.py b/python/samba/lsa_utils.py index b4df0fa5bb8..2feeac02672 100644 --- a/python/samba/lsa_utils.py +++ b/python/samba/lsa_utils.py @@ -18,10 +18,54 @@ from samba.dcerpc import lsa, drsblobs from samba.ndr import ndr_pack -from samba import arcfour_encrypt, string_to_byte_array +from samba import NTSTATUSError, arcfour_encrypt, string_to_byte_array +from samba.ntstatus import ( + NT_STATUS_RPC_PROCNUM_OUT_OF_RANGE +) import random from samba import crypto + +def OpenPolicyFallback( + conn: lsa.lsarpc, + system_name: str, + in_version: int, + in_revision_info: lsa.revision_info1, + sec_qos: bool = False, + access_mask: int = 0, +): + attr = lsa.ObjectAttribute() + if sec_qos: + qos = lsa.QosInfo() + qos.len = 0xc + qos.impersonation_level = 2 + qos.context_mode = 1 + qos.effective_only = 0 + + attr.sec_qos = qos + + try: + out_version, out_rev_info, policy = conn.OpenPolicy3( + system_name, + attr, + access_mask, + in_version, + in_revision_info + ) + except NTSTATUSError as e: + if e.args[0] == NT_STATUS_RPC_PROCNUM_OUT_OF_RANGE: + out_version = 1 + out_rev_info = lsa.revision_info1() + out_rev_info.revision = 1 + out_rev_info.supported_features = 0 + + policy = conn.OpenPolicy2(system_name, attr, access_mask) + else: + raise + + return out_version, out_rev_info, policy + + def CreateTrustedDomainRelax(lsaconn, policy, trust_info, mask, in_blob, out_blob): def generate_AuthInfoInternal(session_key, incoming=None, outgoing=None): diff --git a/python/samba/tests/dcerpc/createtrustrelax.py b/python/samba/tests/dcerpc/createtrustrelax.py index ea4b67dbb07..fbd99484797 100644 --- a/python/samba/tests/dcerpc/createtrustrelax.py +++ b/python/samba/tests/dcerpc/createtrustrelax.py @@ -24,7 +24,7 @@ import samba from samba.tests import TestCase from samba.dcerpc import lsa, security, drsblobs from samba.credentials import Credentials, SMB_ENCRYPTION_REQUIRED, SMB_ENCRYPTION_OFF -from samba.lsa_utils import CreateTrustedDomainRelax +from samba.lsa_utils import OpenPolicyFallback, CreateTrustedDomainRelax class CreateTrustedDomainRelaxTest(TestCase): @@ -57,12 +57,20 @@ class CreateTrustedDomainRelaxTest(TestCase): else: self.assertFalse(lsa_conn.transport_encrypted()) - objectAttr = lsa.ObjectAttribute() - objectAttr.sec_qos = lsa.QosInfo() + in_version = 1 + in_revision_info1 = lsa.revision_info1() + in_revision_info1.revision = 1 + in_revision_info1.supported_features = ( + lsa.LSA_FEATURE_TDO_AUTH_INFO_AES_CIPHER + ) - pol_handle = lsa_conn.OpenPolicy2('', - objectAttr, - security.SEC_FLAG_MAXIMUM_ALLOWED) + out_version, out_revision_info1, pol_handle = OpenPolicyFallback( + lsa_conn, + '', + in_version, + in_revision_info1, + access_mask=security.SEC_FLAG_MAXIMUM_ALLOWED + ) self.assertIsNotNone(pol_handle) name = lsa.String()