1
0
mirror of https://github.com/samba-team/samba.git synced 2025-01-22 22:04:08 +03:00

pysmbd: make "session_info" arg to py_smbd_get_nt_acl() mandatory

Signed-off-by: Ralph Boehme <slow@samba.org>
Reviewed-by: Stefan Metzmacher <metze@samba.org>
This commit is contained in:
Ralph Boehme 2019-12-17 14:52:49 +01:00
parent 437af4d079
commit 9b2c415d2c
7 changed files with 62 additions and 61 deletions

View File

@ -175,11 +175,11 @@ class cmd_ntacl_get(Command):
acl = getntacl(lp,
file,
system_session_unix(),
xattr_backend,
eadb_file,
direct_db_access=use_ntvfs,
service=service,
session_info=system_session_unix())
service=service)
if as_sddl:
self.outf.write(acl.as_sddl(domain_sid) + "\n")
else:
@ -281,11 +281,11 @@ class cmd_ntacl_changedomsid(Command):
try:
acl = getntacl(lp,
file,
system_session_unix(),
xattr_backend,
eadb_file,
direct_db_access=use_ntvfs,
service=service,
session_info=system_session_unix())
service=service)
except Exception as e:
raise CommandError("Could not get acl for %s: %s" % (file, e))

View File

@ -99,11 +99,11 @@ def getdosinfo(lp, file):
def getntacl(lp,
file,
session_info,
backend=None,
eadbfile=None,
direct_db_access=True,
service=None,
session_info=None):
service=None):
if direct_db_access:
(backend_obj, dbname) = checkset_backend(lp, backend, eadbfile)
if dbname is not None:
@ -131,8 +131,8 @@ def getntacl(lp,
else:
return smbd.get_nt_acl(file,
SECURITY_SECINFO_FLAGS,
service=service,
session_info=session_info)
session_info,
service=service)
def setntacl(lp, file, sddl, domsid, session_info,
@ -449,12 +449,12 @@ class NtaclsHelper:
self.use_ntvfs = "smb" in self.lp.get("server services")
def getntacl(self, path, as_sddl=False, direct_db_access=None):
def getntacl(self, path, session_info, as_sddl=False, direct_db_access=None):
if direct_db_access is None:
direct_db_access = self.use_ntvfs
ntacl_sd = getntacl(
self.lp, path,
self.lp, path, session_info,
direct_db_access=direct_db_access,
service=self.service)
@ -565,7 +565,7 @@ def backup_offline(src_service_path, dest_tarfile_path, samdb_conn, smb_conf_pat
dst = os.path.join(dst_dirpath, dirname)
# mkdir with metadata
smbd.mkdir(dst, service)
ntacl_sddl_str = ntacls_helper.getntacl(src, as_sddl=True)
ntacl_sddl_str = ntacls_helper.getntacl(src, session_info, as_sddl=True)
_create_ntacl_file(dst, ntacl_sddl_str)
# create files and NTACL file, then copy data
@ -574,7 +574,7 @@ def backup_offline(src_service_path, dest_tarfile_path, samdb_conn, smb_conf_pat
dst = os.path.join(dst_dirpath, filename)
# create an empty file with metadata
smbd.create_file(dst, service)
ntacl_sddl_str = ntacls_helper.getntacl(src, as_sddl=True)
ntacl_sddl_str = ntacls_helper.getntacl(src, session_info, as_sddl=True)
_create_ntacl_file(dst, ntacl_sddl_str)
# now put data in

View File

@ -1790,14 +1790,15 @@ def acl_type(direct_db_access):
def check_dir_acl(path, acl, lp, domainsid, direct_db_access):
fsacl = getntacl(lp, path, direct_db_access=direct_db_access, service=SYSVOL_SERVICE)
session_info = system_session_unix()
fsacl = getntacl(lp, path, session_info, direct_db_access=direct_db_access, service=SYSVOL_SERVICE)
fsacl_sddl = fsacl.as_sddl(domainsid)
if fsacl_sddl != acl:
raise ProvisioningError('%s ACL on GPO directory %s %s does not match expected value %s from GPO object' % (acl_type(direct_db_access), path, fsacl_sddl, acl))
for root, dirs, files in os.walk(path, topdown=False):
for name in files:
fsacl = getntacl(lp, os.path.join(root, name),
fsacl = getntacl(lp, os.path.join(root, name), session_info,
direct_db_access=direct_db_access, service=SYSVOL_SERVICE)
if fsacl is None:
raise ProvisioningError('%s ACL on GPO file %s not found!' %
@ -1808,7 +1809,7 @@ def check_dir_acl(path, acl, lp, domainsid, direct_db_access):
raise ProvisioningError('%s ACL on GPO file %s %s does not match expected value %s from GPO object' % (acl_type(direct_db_access), os.path.join(root, name), fsacl_sddl, acl))
for name in dirs:
fsacl = getntacl(lp, os.path.join(root, name),
fsacl = getntacl(lp, os.path.join(root, name), session_info,
direct_db_access=direct_db_access, service=SYSVOL_SERVICE)
if fsacl is None:
raise ProvisioningError('%s ACL on GPO directory %s not found!'
@ -1834,7 +1835,8 @@ def check_gpos_acl(sysvol, dnsdomain, domainsid, domaindn, samdb, lp,
# Set ACL for GPO root folder
root_policy_path = os.path.join(sysvol, dnsdomain, "Policies")
fsacl = getntacl(lp, root_policy_path,
session_info = system_session_unix()
fsacl = getntacl(lp, root_policy_path, session_info,
direct_db_access=direct_db_access, service=SYSVOL_SERVICE)
if fsacl is None:
raise ProvisioningError('DB ACL on policy root %s %s not found!' % (acl_type(direct_db_access), root_policy_path))
@ -1887,10 +1889,11 @@ def checksysvolacl(samdb, netlogon, sysvol, domainsid, dnsdomain, domaindn,
raise ProvisioningError('Realm as seen by pdb_samba_dsdb [%s] does not match Realm as seen by the provision script [%s]!' % (domain_info["dns_domain"].upper(), dnsdomain.upper()))
# Ensure we can read this directly, and via the smbd VFS
session_info = system_session_unix()
for direct_db_access in [True, False]:
# Check the SYSVOL_ACL on the sysvol folder and subfolder (first level)
for dir_path in [os.path.join(sysvol, dnsdomain), netlogon]:
fsacl = getntacl(lp, dir_path, direct_db_access=direct_db_access, service=SYSVOL_SERVICE)
fsacl = getntacl(lp, dir_path, session_info, direct_db_access=direct_db_access, service=SYSVOL_SERVICE)
if fsacl is None:
raise ProvisioningError('%s ACL on sysvol directory %s not found!' % (acl_type(direct_db_access), dir_path))
fsacl_sddl = fsacl.as_sddl(domainsid)

View File

@ -54,7 +54,7 @@ class NtaclsTests(TestCaseInTempDir):
open(self.tempf, 'w').write("empty")
lp.set("posix:eadb", os.path.join(self.tempdir, "eadbtest.tdb"))
setntacl(lp, self.tempf, NTACL_SDDL, DOMAIN_SID, self.session_info)
facl = getntacl(lp, self.tempf)
facl = getntacl(lp, self.tempf, self.session_info)
anysid = security.dom_sid(security.SID_NT_SELF)
self.assertEquals(facl.as_sddl(anysid), NTACL_SDDL)
os.unlink(os.path.join(self.tempdir, "eadbtest.tdb"))
@ -64,7 +64,7 @@ class NtaclsTests(TestCaseInTempDir):
open(self.tempf, 'w').write("empty")
setntacl(lp, self.tempf, NTACL_SDDL, DOMAIN_SID, self.session_info, "tdb",
os.path.join(self.tempdir, "eadbtest.tdb"))
facl = getntacl(lp, self.tempf, "tdb", os.path.join(
facl = getntacl(lp, self.tempf, self.session_info, "tdb", os.path.join(
self.tempdir, "eadbtest.tdb"))
domsid = security.dom_sid(security.SID_NT_SELF)
self.assertEquals(facl.as_sddl(domsid), NTACL_SDDL)

View File

@ -152,10 +152,10 @@ class NtaclsBackupRestoreTests(SmbdBaseTests):
sd0 = self.smb_helper.get_acl(file_name, as_sddl=True)
sd1 = self.ntacls_helper.getntacl(
file_path, as_sddl=True, direct_db_access=False)
file_path, system_session_unix(), as_sddl=True, direct_db_access=False)
sd2 = self.ntacls_helper.getntacl(
file_path, as_sddl=True, direct_db_access=True)
file_path, system_session_unix(), as_sddl=True, direct_db_access=True)
self.assertEquals(sd0, sd1)
self.assertEquals(sd1, sd2)

View File

@ -76,7 +76,7 @@ class PosixAclMappingTests(SmbdBaseTests):
acl = ACL
setntacl(self.lp, self.tempf, acl, DOM_SID,
self.get_session_info(), use_ntvfs=True)
facl = getntacl(self.lp, self.tempf, direct_db_access=True)
facl = getntacl(self.lp, self.tempf, self.get_session_info(), direct_db_access=True)
anysid = security.dom_sid(security.SID_NT_SELF)
self.assertEquals(facl.as_sddl(anysid), acl)
@ -90,7 +90,7 @@ class PosixAclMappingTests(SmbdBaseTests):
# However, this only asks the xattr
self.assertRaises(
TypeError, getntacl, self.lp, self.tempf, direct_db_access=True)
TypeError, getntacl, self.lp, self.tempf, self.get_session_info(), direct_db_access=True)
def test_setntacl_invalidate_getntacl(self):
acl = ACL
@ -103,7 +103,7 @@ class PosixAclMappingTests(SmbdBaseTests):
self.tempf, "system.fake_access_acl", b"")
# however, as this is direct DB access, we do not notice it
facl = getntacl(self.lp, self.tempf, direct_db_access=True)
facl = getntacl(self.lp, self.tempf, self.get_session_info(), direct_db_access=True)
anysid = security.dom_sid(security.SID_NT_SELF)
self.assertEquals(acl, facl.as_sddl(anysid))
@ -118,7 +118,7 @@ class PosixAclMappingTests(SmbdBaseTests):
self.tempf, "system.fake_access_acl", b"")
# the hash would break, and we return an ACL based only on the mode, except we set the ACL using the 'ntvfs' mode that doesn't include a hash
facl = getntacl(self.lp, self.tempf)
facl = getntacl(self.lp, self.tempf, self.get_session_info())
anysid = security.dom_sid(security.SID_NT_SELF)
self.assertEquals(acl, facl.as_sddl(anysid))
@ -135,7 +135,7 @@ class PosixAclMappingTests(SmbdBaseTests):
self.tempf, "system.fake_access_acl", b"")
# the hash will break, and we return an ACL based only on the mode
facl = getntacl(self.lp, self.tempf, direct_db_access=False)
facl = getntacl(self.lp, self.tempf, self.get_session_info(), direct_db_access=False)
anysid = security.dom_sid(security.SID_NT_SELF)
self.assertEquals(simple_acl_from_posix, facl.as_sddl(anysid))
@ -143,7 +143,7 @@ class PosixAclMappingTests(SmbdBaseTests):
acl = ACL
setntacl(self.lp, self.tempf, acl, DOM_SID,
self.get_session_info(), use_ntvfs=True)
facl = getntacl(self.lp, self.tempf, direct_db_access=False)
facl = getntacl(self.lp, self.tempf, self.get_session_info(), direct_db_access=False)
anysid = security.dom_sid(security.SID_NT_SELF)
self.assertEquals(facl.as_sddl(anysid), acl)
@ -151,7 +151,7 @@ class PosixAclMappingTests(SmbdBaseTests):
acl = ACL
setntacl(self.lp, self.tempf, acl, DOM_SID,
self.get_session_info(), use_ntvfs=False)
facl = getntacl(self.lp, self.tempf, direct_db_access=False)
facl = getntacl(self.lp, self.tempf, self.get_session_info(), direct_db_access=False)
anysid = security.dom_sid(security.SID_NT_SELF)
self.assertEquals(facl.as_sddl(anysid), acl)
@ -162,7 +162,7 @@ class PosixAclMappingTests(SmbdBaseTests):
self.get_session_info(), use_ntvfs=False)
# This invalidates the hash of the NT acl just set because there is a hook in the posix ACL set code
smbd.set_simple_acl(self.tempf, 0o640, self.get_session_info())
facl = getntacl(self.lp, self.tempf, direct_db_access=False)
facl = getntacl(self.lp, self.tempf, self.get_session_info(), direct_db_access=False)
anysid = security.dom_sid(security.SID_NT_SELF)
self.assertEquals(simple_acl_from_posix, facl.as_sddl(anysid))
@ -178,7 +178,7 @@ class PosixAclMappingTests(SmbdBaseTests):
smbd.set_simple_acl(self.tempf, 0o640, self.get_session_info(), BA_gid)
# This should re-calculate an ACL based on the posix details
facl = getntacl(self.lp, self.tempf, direct_db_access=False)
facl = getntacl(self.lp, self.tempf, self.get_session_info(), direct_db_access=False)
anysid = security.dom_sid(security.SID_NT_SELF)
self.assertEquals(simple_acl_from_posix, facl.as_sddl(anysid))
@ -186,7 +186,7 @@ class PosixAclMappingTests(SmbdBaseTests):
acl = "O:DAG:DUD:P(A;OICI;0x001f01ff;;;DA)(A;OICI;0x001f01ff;;;EA)(A;OICIIO;0x001f01ff;;;CO)(A;OICI;0x001f01ff;;;DA)(A;OICI;0x001f01ff;;;SY)(A;OICI;0x001200a9;;;AU)(A;OICI;0x001200a9;;;ED)S:AI(OU;CIIDSA;WP;f30e3bbe-9ff0-11d1-b603-0000f80367c1;bf967aa5-0de6-11d0-a285-00aa003049e2;WD)(OU;CIIDSA;WP;f30e3bbf-9ff0-11d1-b603-0000f80367c1;bf967aa5-0de6-11d0-a285-00aa003049e2;WD)"
setntacl(self.lp, self.tempf, acl, DOM_SID,
self.get_session_info(), use_ntvfs=False)
facl = getntacl(self.lp, self.tempf, direct_db_access=False)
facl = getntacl(self.lp, self.tempf, self.get_session_info(), direct_db_access=False)
domsid = security.dom_sid(DOM_SID)
self.assertEquals(facl.as_sddl(domsid), acl)
@ -194,7 +194,7 @@ class PosixAclMappingTests(SmbdBaseTests):
acl = ACL
setntacl(self.lp, self.tempf, acl, DOM_SID,
self.get_session_info(), use_ntvfs=False)
facl = getntacl(self.lp, self.tempf)
facl = getntacl(self.lp, self.tempf, self.get_session_info())
anysid = security.dom_sid(security.SID_NT_SELF)
self.assertEquals(facl.as_sddl(anysid), acl)
posix_acl = smbd.get_sys_acl(self.tempf, smb_acl.SMB_ACL_TYPE_ACCESS)
@ -202,14 +202,14 @@ class PosixAclMappingTests(SmbdBaseTests):
def test_setposixacl_getntacl(self):
smbd.set_simple_acl(self.tempf, 0o750, self.get_session_info())
# We don't expect the xattr to be filled in in this case
self.assertRaises(TypeError, getntacl, self.lp, self.tempf)
self.assertRaises(TypeError, getntacl, self.lp, self.tempf, self.get_session_info())
def test_setposixacl_getntacl_smbd(self):
s4_passdb = passdb.PDB(self.lp.get("passdb backend"))
group_SID = s4_passdb.gid_to_sid(os.stat(self.tempf).st_gid)
user_SID = s4_passdb.uid_to_sid(os.stat(self.tempf).st_uid)
smbd.set_simple_acl(self.tempf, 0o640, self.get_session_info())
facl = getntacl(self.lp, self.tempf, direct_db_access=False)
facl = getntacl(self.lp, self.tempf, self.get_session_info(), direct_db_access=False)
acl = "O:%sG:%sD:(A;;0x001f019f;;;%s)(A;;0x00120089;;;%s)(A;;;;;WD)" % (user_SID, group_SID, user_SID, group_SID)
anysid = security.dom_sid(security.SID_NT_SELF)
self.assertEquals(acl, facl.as_sddl(anysid))
@ -226,7 +226,7 @@ class PosixAclMappingTests(SmbdBaseTests):
self.assertEquals(SO_type, idmap.ID_TYPE_BOTH)
smbd.chown(self.tempdir, BA_id, SO_id, self.get_session_info())
smbd.set_simple_acl(self.tempdir, 0o750, self.get_session_info())
facl = getntacl(self.lp, self.tempdir, direct_db_access=False)
facl = getntacl(self.lp, self.tempdir, self.get_session_info(), direct_db_access=False)
acl = "O:BAG:SOD:(A;;0x001f01ff;;;BA)(A;;0x001200a9;;;SO)(A;;;;;WD)(A;OICIIO;0x001f01ff;;;CO)(A;OICIIO;0x001200a9;;;CG)(A;OICIIO;0x001200a9;;;WD)"
anysid = security.dom_sid(security.SID_NT_SELF)
@ -240,7 +240,7 @@ class PosixAclMappingTests(SmbdBaseTests):
user_SID = s4_passdb.uid_to_sid(os.stat(self.tempf).st_uid)
self.assertEquals(BA_type, idmap.ID_TYPE_BOTH)
smbd.set_simple_acl(self.tempf, 0o640, self.get_session_info(), BA_gid)
facl = getntacl(self.lp, self.tempf, direct_db_access=False)
facl = getntacl(self.lp, self.tempf, self.get_session_info(), direct_db_access=False)
domsid = passdb.get_global_sam_sid()
acl = "O:%sG:%sD:(A;;0x001f019f;;;%s)(A;;0x00120089;;;BA)(A;;0x00120089;;;%s)(A;;;;;WD)" % (user_SID, group_SID, user_SID, group_SID)
anysid = security.dom_sid(security.SID_NT_SELF)
@ -312,7 +312,7 @@ class PosixAclMappingTests(SmbdBaseTests):
session_info = self.get_session_info(domsid)
setntacl(self.lp, self.tempf, acl, str(domsid),
session_info, use_ntvfs=False)
facl = getntacl(self.lp, self.tempf)
facl = getntacl(self.lp, self.tempf, session_info)
self.assertEquals(facl.as_sddl(domsid), acl)
posix_acl = smbd.get_sys_acl(self.tempf, smb_acl.SMB_ACL_TYPE_ACCESS)
@ -456,7 +456,7 @@ class PosixAclMappingTests(SmbdBaseTests):
session_info = self.get_session_info(domsid)
setntacl(self.lp, self.tempdir, acl, str(domsid),
session_info, use_ntvfs=False)
facl = getntacl(self.lp, self.tempdir)
facl = getntacl(self.lp, self.tempdir, session_info)
self.assertEquals(facl.as_sddl(domsid), acl)
posix_acl = smbd.get_sys_acl(self.tempdir, smb_acl.SMB_ACL_TYPE_ACCESS)
@ -549,7 +549,7 @@ class PosixAclMappingTests(SmbdBaseTests):
session_info = self.get_session_info(domsid)
setntacl(self.lp, self.tempdir, acl, str(domsid),
session_info, use_ntvfs=False)
facl = getntacl(self.lp, self.tempdir)
facl = getntacl(self.lp, self.tempdir, session_info)
self.assertEquals(facl.as_sddl(domsid), acl)
posix_acl = smbd.get_sys_acl(self.tempdir, smb_acl.SMB_ACL_TYPE_ACCESS)
@ -655,7 +655,7 @@ class PosixAclMappingTests(SmbdBaseTests):
session_info = self.get_session_info(domsid)
setntacl(self.lp, self.tempf, acl, str(domsid),
session_info, use_ntvfs=False)
facl = getntacl(self.lp, self.tempf)
facl = getntacl(self.lp, self.tempf, session_info)
self.assertEquals(facl.as_sddl(domsid), acl)
posix_acl = smbd.get_sys_acl(self.tempf, smb_acl.SMB_ACL_TYPE_ACCESS)

View File

@ -757,8 +757,8 @@ static PyObject *py_smbd_get_nt_acl(PyObject *self, PyObject *args, PyObject *kw
const char * const kwnames[] = {
"fname",
"security_info_wanted",
"service",
"session_info",
"service",
NULL
};
char *fname, *service = NULL;
@ -774,34 +774,32 @@ static PyObject *py_smbd_get_nt_acl(PyObject *self, PyObject *args, PyObject *kw
ret = PyArg_ParseTupleAndKeywords(args,
kwargs,
"si|zO",
"siO|z",
discard_const_p(char *, kwnames),
&fname,
&security_info_wanted,
&service,
&py_session);
&py_session,
&service);
if (!ret) {
TALLOC_FREE(frame);
return NULL;
}
if (py_session != Py_None) {
if (!py_check_dcerpc_type(py_session,
"samba.dcerpc.auth",
"session_info")) {
TALLOC_FREE(frame);
return NULL;
}
session_info = pytalloc_get_type(py_session,
struct auth_session_info);
if (!session_info) {
PyErr_Format(
PyExc_TypeError,
"Expected auth_session_info for "
"session_info argument got %s",
pytalloc_get_name(py_session));
return NULL;
}
if (!py_check_dcerpc_type(py_session,
"samba.dcerpc.auth",
"session_info")) {
TALLOC_FREE(frame);
return NULL;
}
session_info = pytalloc_get_type(py_session,
struct auth_session_info);
if (session_info == NULL) {
PyErr_Format(
PyExc_TypeError,
"Expected auth_session_info for "
"session_info argument got %s",
pytalloc_get_name(py_session));
return NULL;
}
conn = get_conn_tos(service, session_info);