1
0
mirror of https://github.com/samba-team/samba.git synced 2024-12-22 13:34:15 +03:00

gp: Ensure centrify crontab user policy performs proper cleanup

This resolves cleanup issues for user and group
centrify compatible policies. It also ensures the
crontab policies use functions from the scripts
policy, to avoid code duplication and simplify
cleanup.

Signed-off-by: David Mulder <dmulder@samba.org>
Reviewed-by: Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
This commit is contained in:
David Mulder 2023-07-25 13:23:10 -06:00 committed by Andrew Bartlett
parent 8cc706c102
commit ab2cda7928
2 changed files with 24 additions and 52 deletions

View File

@ -16,9 +16,11 @@
import os, re
from subprocess import Popen, PIPE
from samba.gp.gpclass import gp_pol_ext, drop_privileges, gp_file_applier
from hashlib import blake2b
from samba.gp.gpclass import gp_pol_ext, drop_privileges, gp_file_applier, \
gp_misc_applier
from tempfile import NamedTemporaryFile
from samba.gp.gp_scripts_ext import fetch_crontab, install_crontab, \
install_user_crontab
intro = '''
### autogenerated by samba
@ -92,73 +94,44 @@ class gp_centrify_crontab_ext(gp_pol_ext, gp_file_applier):
output[str(self)].append(e.data)
return output
def fetch_crontab(username):
p = Popen(['crontab', '-l', '-u', username], stdout=PIPE, stderr=PIPE)
out, err = p.communicate()
if p.returncode != 0:
raise RuntimeError('Failed to read the crontab: %s' % err)
m = re.findall('%s(.*)%s' % (intro, end), out.decode(), re.DOTALL)
if len(m) == 1:
entries = m[0].strip().split('\n')
else:
entries = []
m = re.findall('(.*)%s.*%s(.*)' % (intro, end), out.decode(), re.DOTALL)
if len(m) == 1:
others = '\n'.join([l.strip() for l in m[0]])
else:
others = out.decode()
return others, entries
class gp_user_centrify_crontab_ext(gp_centrify_crontab_ext, gp_misc_applier):
def unapply(self, guid, attribute, entry):
others, entries = fetch_crontab(self.username)
if entry in entries:
entries.remove(entry)
install_user_crontab(self.username, others, entries)
self.cache_remove_attribute(guid, attribute)
def install_crontab(fname, username):
p = Popen(['crontab', fname, '-u', username], stdout=PIPE, stderr=PIPE)
_, err = p.communicate()
if p.returncode != 0:
raise RuntimeError('Failed to install crontab: %s' % err)
def apply(self, guid, attribute, entry):
old_val = self.cache_get_attribute_value(guid, attribute)
others, entries = fetch_crontab(self.username)
if not old_val or entry not in entries:
entries.append(entry)
install_user_crontab(self.username, others, entries)
self.cache_add_attribute(guid, attribute, entry)
class gp_user_centrify_crontab_ext(gp_centrify_crontab_ext):
def process_group_policy(self, deleted_gpo_list, changed_gpo_list):
for guid, settings in deleted_gpo_list:
self.gp_db.set_guid(guid)
if str(self) in settings:
others, entries = fetch_crontab(self.username)
for attribute, entry in settings[str(self)].items():
if entry in entries:
entries.remove(entry)
self.gp_db.delete(str(self), attribute)
with NamedTemporaryFile() as f:
if len(entries) > 0:
f.write('\n'.join([others, intro,
'\n'.join(entries), end]).encode())
else:
f.write(others.encode())
f.flush()
install_crontab(f.name, self.username)
self.gp_db.commit()
self.unapply(guid, attribute, entry)
for gpo in changed_gpo_list:
if gpo.file_sys_path:
section = \
'Software\\Policies\\Centrify\\UnixSettings\\CrontabEntries'
self.gp_db.set_guid(gpo.name)
pol_file = 'USER/Registry.pol'
path = os.path.join(gpo.file_sys_path, pol_file)
pol_conf = drop_privileges('root', self.parse, path)
if not pol_conf:
continue
attrs = []
for e in pol_conf.entries:
if e.keyname == section and e.data.strip():
attribute = blake2b(e.data.encode()).hexdigest()
old_val = self.gp_db.retrieve(str(self), attribute)
others, entries = fetch_crontab(self.username)
if not old_val or e.data not in entries:
entries.append(e.data)
with NamedTemporaryFile() as f:
f.write('\n'.join([others, intro,
'\n'.join(entries), end]).encode())
f.flush()
install_crontab(f.name, self.username)
self.gp_db.store(str(self), attribute, e.data)
self.gp_db.commit()
attribute = self.generate_attribute(e.data)
attrs.append(attribute)
self.apply(gpo.name, attribute, e.data)
self.clean(gpo.name, keep=attrs)
def rsop(self, gpo):
return super().rsop(gpo, target='USER')

View File

@ -1 +0,0 @@
^samba.tests.gpo.samba.tests.gpo.GPOTests.test_gp_user_centrify_crontab_ext