mirror of
https://github.com/samba-team/samba.git
synced 2025-12-16 00:23:52 +03:00
samba-tool: rewrite dsacl.py to use the new sd_utils helpers
Signed-off-by: Stefan Metzmacher <metze@samba.org> Reviewed-by: Andrew Bartlett <abartlet@samba.org> Reviewed-by: Douglas Bagnall <douglas.bagnall@catalyst.net.nz> Autobuild-User(master): Stefan Metzmacher <metze@samba.org> Autobuild-Date(master): Wed Mar 22 15:57:15 UTC 2023 on atb-devel-224
This commit is contained in:
@@ -17,6 +17,7 @@
|
||||
#
|
||||
|
||||
import samba.getopt as options
|
||||
from samba import sd_utils
|
||||
from samba.dcerpc import security
|
||||
from samba.samdb import SamDB
|
||||
from samba.ndr import ndr_unpack, ndr_pack
|
||||
@@ -42,38 +43,6 @@ from samba.netcmd import (
|
||||
Option,
|
||||
)
|
||||
|
||||
def find_trustee_sid(samdb, trusteedn):
|
||||
res = samdb.search(base=trusteedn, expression="(objectClass=*)",
|
||||
scope=SCOPE_BASE)
|
||||
assert(len(res) == 1)
|
||||
return ndr_unpack(security.dom_sid, res[0]["objectSid"][0])
|
||||
|
||||
|
||||
def modify_descriptor(samdb, object_dn, desc, controls=None):
|
||||
assert(isinstance(desc, security.descriptor))
|
||||
m = ldb.Message()
|
||||
m.dn = ldb.Dn(samdb, object_dn)
|
||||
m["nTSecurityDescriptor"] = ldb.MessageElement(
|
||||
(ndr_pack(desc)), ldb.FLAG_MOD_REPLACE,
|
||||
"nTSecurityDescriptor")
|
||||
samdb.modify(m)
|
||||
|
||||
|
||||
def read_descriptor(samdb, object_dn):
|
||||
res = samdb.search(base=object_dn, scope=SCOPE_BASE,
|
||||
attrs=["nTSecurityDescriptor"])
|
||||
# we should theoretically always have an SD
|
||||
assert(len(res) == 1)
|
||||
desc = res[0]["nTSecurityDescriptor"][0]
|
||||
return ndr_unpack(security.descriptor, desc)
|
||||
|
||||
|
||||
def get_domain_sid(samdb):
|
||||
res = samdb.search(base=samdb.domain_dn(),
|
||||
expression="(objectClass=*)", scope=SCOPE_BASE)
|
||||
return ndr_unpack(security.dom_sid, res[0]["objectSid"][0])
|
||||
|
||||
|
||||
class cmd_dsacl_base(Command):
|
||||
"""Base class for DSACL commands."""
|
||||
|
||||
@@ -85,9 +54,8 @@ class cmd_dsacl_base(Command):
|
||||
"versionopts": options.VersionOptions,
|
||||
}
|
||||
|
||||
def print_acl(self, samdb, object_dn, prefix=''):
|
||||
desc = read_descriptor(samdb, object_dn)
|
||||
desc_sddl = desc.as_sddl(get_domain_sid(samdb))
|
||||
def print_acl(self, sd_helper, object_dn, prefix=''):
|
||||
desc_sddl = sd_helper.get_sd_as_sddl(object_dn)
|
||||
self.outf.write("%sdescriptor for %s:\n" % (prefix, object_dn))
|
||||
self.outf.write(desc_sddl + "\n")
|
||||
|
||||
@@ -124,26 +92,21 @@ class cmd_dsacl_set(cmd_dsacl_base):
|
||||
type="string"),
|
||||
]
|
||||
|
||||
def add_ace(self, samdb, object_dn, new_ace):
|
||||
def find_trustee_sid(self, samdb, trusteedn):
|
||||
res = samdb.search(base=trusteedn, expression="(objectClass=*)",
|
||||
scope=SCOPE_BASE)
|
||||
assert(len(res) == 1)
|
||||
return ndr_unpack(security.dom_sid, res[0]["objectSid"][0])
|
||||
|
||||
def add_ace(self, sd_helper, object_dn, new_ace):
|
||||
"""Add new ace explicitly."""
|
||||
desc = read_descriptor(samdb, object_dn)
|
||||
new_ace = security.descriptor.from_sddl("D:" + new_ace, get_domain_sid(samdb))
|
||||
new_ace_list = re.findall(r"\(.*?\)",new_ace.as_sddl())
|
||||
for new_ace in new_ace_list:
|
||||
desc_sddl = desc.as_sddl(get_domain_sid(samdb))
|
||||
# TODO add bindings for descriptor manipulation and get rid of this
|
||||
desc_aces = re.findall(r"\(.*?\)", desc_sddl)
|
||||
for ace in desc_aces:
|
||||
if ("ID" in ace):
|
||||
desc_sddl = desc_sddl.replace(ace, "")
|
||||
if new_ace in desc_sddl:
|
||||
continue
|
||||
if desc_sddl.find("(") >= 0:
|
||||
desc_sddl = desc_sddl[:desc_sddl.index("(")] + new_ace + desc_sddl[desc_sddl.index("("):]
|
||||
else:
|
||||
desc_sddl = desc_sddl + new_ace
|
||||
desc = security.descriptor.from_sddl(desc_sddl, get_domain_sid(samdb))
|
||||
modify_descriptor(samdb, object_dn, desc)
|
||||
ai,ii = sd_helper.dacl_prepend_aces(object_dn, new_ace)
|
||||
for ace in ii:
|
||||
sddl = ace.as_sddl(sd_helper.domain_sid)
|
||||
self.outf.write("WARNING: ignored INHERITED_ACE (%s).\n" % sddl)
|
||||
for ace in ai:
|
||||
sddl = ace.as_sddl(sd_helper.domain_sid)
|
||||
self.outf.write("WARNING: (%s) was already found in the current security descriptor.\n" % sddl)
|
||||
|
||||
def run(self, car, action, objectdn, trusteedn, sddl,
|
||||
H=None, credopts=None, sambaopts=None, versionopts=None):
|
||||
@@ -156,6 +119,7 @@ class cmd_dsacl_set(cmd_dsacl_base):
|
||||
|
||||
samdb = SamDB(url=H, session_info=system_session(),
|
||||
credentials=creds, lp=lp)
|
||||
sd_helper = sd_utils.SDUtils(samdb)
|
||||
cars = {'change-rid': GUID_DRS_CHANGE_RID_MASTER,
|
||||
'change-pdc': GUID_DRS_CHANGE_PDC,
|
||||
'change-infrastructure': GUID_DRS_CHANGE_INFR_MASTER,
|
||||
@@ -170,7 +134,7 @@ class cmd_dsacl_set(cmd_dsacl_base):
|
||||
'repl-sync': GUID_DRS_REPL_SYNCRONIZE,
|
||||
'ro-repl-secret-sync': GUID_DRS_RO_REPL_SECRET_SYNC,
|
||||
}
|
||||
sid = find_trustee_sid(samdb, trusteedn)
|
||||
sid = self.find_trustee_sid(samdb, trusteedn)
|
||||
if sddl:
|
||||
new_ace = sddl
|
||||
elif action == "allow":
|
||||
@@ -180,9 +144,9 @@ class cmd_dsacl_set(cmd_dsacl_base):
|
||||
else:
|
||||
raise CommandError("Wrong argument '%s'!" % action)
|
||||
|
||||
self.print_acl(samdb, objectdn, prefix='old ')
|
||||
self.add_ace(samdb, objectdn, new_ace)
|
||||
self.print_acl(samdb, objectdn, prefix='new ')
|
||||
self.print_acl(sd_helper, objectdn, prefix='old ')
|
||||
self.add_ace(sd_helper, objectdn, new_ace)
|
||||
self.print_acl(sd_helper, objectdn, prefix='new ')
|
||||
|
||||
|
||||
class cmd_dsacl_get(cmd_dsacl_base):
|
||||
@@ -202,7 +166,8 @@ class cmd_dsacl_get(cmd_dsacl_base):
|
||||
|
||||
samdb = SamDB(url=H, session_info=system_session(),
|
||||
credentials=creds, lp=lp)
|
||||
self.print_acl(samdb, objectdn)
|
||||
sd_helper = sd_utils.SDUtils(samdb)
|
||||
self.print_acl(sd_helper, objectdn)
|
||||
|
||||
|
||||
class cmd_dsacl_delete(cmd_dsacl_base):
|
||||
@@ -226,23 +191,21 @@ class cmd_dsacl_delete(cmd_dsacl_base):
|
||||
|
||||
samdb = SamDB(url=H, session_info=system_session(),
|
||||
credentials=creds, lp=lp)
|
||||
sd_helper = sd_utils.SDUtils(samdb)
|
||||
|
||||
self.print_acl(samdb, objectdn, prefix='old ')
|
||||
self.delete_ace(samdb, objectdn, sddl)
|
||||
self.print_acl(samdb, objectdn, prefix='new ')
|
||||
self.print_acl(sd_helper, objectdn, prefix='old ')
|
||||
self.delete_ace(sd_helper, objectdn, sddl)
|
||||
self.print_acl(sd_helper, objectdn, prefix='new ')
|
||||
|
||||
def delete_ace(self, samdb, object_dn, delete_aces):
|
||||
def delete_ace(self, sd_helper, object_dn, delete_aces):
|
||||
"""Delete ace explicitly."""
|
||||
desc = read_descriptor(samdb, object_dn)
|
||||
domsid = get_domain_sid(samdb)
|
||||
delete_aces = security.descriptor.from_sddl("D:" + delete_aces, domsid).dacl.aces
|
||||
for ace in delete_aces:
|
||||
if ace in desc.dacl.aces:
|
||||
desc.dacl_del_ace(ace)
|
||||
else:
|
||||
sddl = ace.as_sddl(domsid)
|
||||
self.outf.write("WARNING: (%s) was not found in the current security descriptor.\n" % sddl)
|
||||
modify_descriptor(samdb, object_dn, desc)
|
||||
di,ii = sd_helper.dacl_delete_aces(object_dn, delete_aces)
|
||||
for ace in ii:
|
||||
sddl = ace.as_sddl(sd_helper.domain_sid)
|
||||
self.outf.write("WARNING: ignored INHERITED_ACE (%s).\n" % sddl)
|
||||
for ace in di:
|
||||
sddl = ace.as_sddl(sd_helper.domain_sid)
|
||||
self.outf.write("WARNING: (%s) was not found in the current security descriptor.\n" % sddl)
|
||||
|
||||
|
||||
class cmd_dsacl(SuperCommand):
|
||||
|
||||
Reference in New Issue
Block a user