1
0
mirror of https://github.com/altlinux/gpupdate.git synced 2025-03-21 18:50:38 +03:00

Introduced functionality to work without DC

This commit is contained in:
Игорь Чудов 2019-11-26 17:47:16 +04:00
parent bf53397320
commit e9bca5e240
Signed by untrusted user: nir
GPG Key ID: 0F3883600CAE7AAC
4 changed files with 209 additions and 146 deletions

View File

@ -1,5 +1,8 @@
import frontend.appliers
import logging
from xml.etree import ElementTree
from samba.gp_parse.gp_pol import GPPolParser
class applier_frontend:
def __init__(self, regobj):
@ -8,6 +11,31 @@ class applier_frontend:
def apply(self):
pass
class entry:
def __init__(self, e_keyname, e_valuename, e_type, e_data):
self.keyname = e_keyname
self.valuename = e_valuename
self.type = e_type
self.data = e_data
def preg2entries(preg_obj):
entries = []
for elem in prej_obj.entries:
entry_obj = entry(elem.keyname, elem.valuename, elem.type, elem.data)
entries.append(entry_obj)
return entries
def load_xml_preg(xml_path):
'''
Parse PReg file and return its preg object
'''
logging.info('Loading PReg from XML: {}'.format(xml_path))
gpparser = GPPolParser()
xml_root = ElementTree.parse(xml_path).getroot()
gpparser.load_xml(xml_root)
gpparser.pol_file.__ndr_print__()
return gpparser.pol_file
class control_applier(applier_frontend):
_registry_branch = 'Software\\BaseALT\\Policies\\Control'
@ -56,28 +84,31 @@ class control_applier(applier_frontend):
polfile.num_entries = len(self.control_settings)
polfile.entries = self.control_settings
print(polfile.__ndr_print__())
policy_writer = GPPolParser()
policy_writer.pol_file = polfile
policy_writer.write_xml('test_reg.xml')
policy_writer.write_binary('test_reg.pol')
class polkit_applier(applier_frontend):
__registry_branch = ''
_registry_branch = 'Software\\Policies\\Microsoft\\Windows\\RemovableStorageDevices'
__policy_map = {
'Deny_All': ['99-gpoa_disk_permissions', args]
'Deny_All': ['99-gpoa_disk_permissions', { 'Deny_All': 0 }]
}
def __init__(self, polfiles):
self.polparsers = polfiles
self.polkit_settings = self._get_policies(self.polparsers)
self.polkit_settings = self._get_policies()
self.policies = []
for setting in self.polkit_settings:
if setting.keyname in __policy_map.keys():
try:
self.policies.append(appliers.polkit(__policy_map[setting.keyname][0], __policy_map[setting.keyname][1]))
except:
logging.info('Unable to work with control: {}'.format(setting.valuename))
if setting.valuename in self.__policy_map.keys() and setting.keyname == self._registry_branch:
logging.info('Found key: {}, file: {} and value: {}'.format(setting.keyname, self.__policy_map[setting.valuename][0], self.__policy_map[setting.valuename][1]))
#try:
self.__policy_map[setting.valuename][1][setting.valuename] = setting.data
self.policies.append(appliers.polkit(self.__policy_map[setting.valuename][0], self.__policy_map[setting.valuename][1]))
#except Exception as exc:
# print(exc)
# logging.info('Unable to work with PolicyKit setting: {}'.format(setting.valuename))
#for e in polfile.pol_file.entries:
# print('{}:{}:{}:{}:{}'.format(e.type, e.data, e.valuename, e.keyname))
@ -90,10 +121,10 @@ class polkit_applier(applier_frontend):
for entry in parser.entries:
if entry.keyname == self._registry_branch:
policies.append(entry)
logging.info('Found control setting: {}'.format(entry.valuename))
logging.info('Found PolicyKit setting: {}'.format(entry.valuename))
else:
# Property names are taken from python/samba/gp_parse/gp_pol.py
logging.info('Dropped control setting: {}\\{}'.format(entry.keyname, entry.valuename))
logging.info('Dropped setting: {}\\{}'.format(entry.keyname, entry.valuename))
return policies
def apply(self):
@ -114,7 +145,7 @@ class polkit_applier(applier_frontend):
polfile.num_entries = len(self.control_settings)
polfile.entries = self.control_settings
print(polfile.__ndr_print__())
policy_writer = GPPolParser()
policy_writer.pol_file = polfile
policy_writer.write_xml('test_reg.xml')
@ -122,11 +153,12 @@ class polkit_applier(applier_frontend):
class applier:
def __init__(self, backend):
def __init__(self, sid, backend):
self.backend = backend
self.gpvalues = self.load_values()
logging.info('Values: {}'.format(self.gpvalues))
capplier = control_applier(self.gpvalues)
pkapplier = polkit_applier(self.gpvalues)
self.appliers = dict({ 'control': capplier, 'polkit': pkapplier })
def load_values(self):

19
gpoa/frontend/appliers/__init__.py Normal file → Executable file
View File

@ -1,6 +1,7 @@
#! /usr/bin/env python3
import subprocess
import threading
import os
import jinja2
class control:
@ -12,7 +13,7 @@ class control:
raise Exception('Unable to query possible values')
def _query_control_values(self):
proc = subprocess.Popen(['sudo', 'control', self.control_name, 'list'], stdout=subprocess.PIPE)
proc = subprocess.Popen(['control', self.control_name, 'list'], stdout=subprocess.PIPE)
for line in proc.stdout:
values = line.split()
return values
@ -25,7 +26,7 @@ class control:
return self.control_name
def get_control_status(self):
proc = subprocess.Popen(['sudo', 'control', self.control_name], stdout=subprocess.PIPE)
proc = subprocess.Popen(['control', self.control_name], stdout=subprocess.PIPE)
for line in proc.stdout:
return line.rstrip('\n\r')
@ -34,25 +35,25 @@ class control:
print('Setting control {} to {}'.format(self.control_name, status))
try:
proc = subprocess.Popen(['sudo', 'control', self.control_name, status], stdout=subprocess.PIPE)
proc = subprocess.Popen(['control', self.control_name, status], stdout=subprocess.PIPE)
except:
print('Unable to set {} to {}'.format(self.control_name, status))
class polkit:
__template_path = '/usr/lib/python3/site-packages/gpoa/templates'
__policy_dir = '/etc/polkit-1/rules.d'
__template_loader = jinja2.FileSystemLoader(searchpath='./')
__template_loader = jinja2.FileSystemLoader(searchpath=__template_path)
__template_environment = jinja2.Environment(loader=__template_loader)
def __init__(self, template_name, **kwargs):
def __init__(self, template_name, arglist):
self.template_name = template_name
self.args = kwargs
self.args = arglist
self.infilename = '{}.rules.j2'.format(self.template_name)
self.outfile = os.path.join(self.__policy_dir, '{}.rules'.format(self.template_name))
def generate(self):
template = self.__template_environment.get_template(self.infilename)
text = template.render(self.args)
text = template.render(**self.args)
with open(self.outfile, 'w') as f:
print(text)
f.write(text)

View File

@ -22,8 +22,7 @@ from samba.gp_parse.gp_pol import GPPolParser
# This is needed to query AD DOMAIN name from LDAP
# using cldap_netlogon (and to replace netads utility
# invocation helper).
from samba.dcerpc import netlogon
from samba.netcmd.common import netcmd_get_domain_infos_via_cldap
#from samba.dcerpc import netlogon
# Registry editing facilities are buggy. Use TDB module instead.
import tdb
@ -39,6 +38,7 @@ import pickle
# Our native control facility
#import appliers
import util
import frontend
# This is needed by helper functions and must be removed after
@ -54,6 +54,7 @@ import sys
# Remove print() from code
import logging
logging.basicConfig(level=logging.DEBUG)
class tdb_regedit:
'''
@ -87,17 +88,15 @@ class applier_backend:
def __init__(self):
pass
def get_cache(cache_file, default_cache_obj):
if not os.path.exists(cache_file):
logging.info('Initializing missing cache file: {}'.format(cache_file))
with open(cache_file, 'wb') as f:
pickle.dump(default_cache_obj, f, pickle.HIGHEST_PROTOCOL)
class local_policy_backend(applier_backend):
__default_policy_path = '/usr/lib/python3/site-packages/gpoa/local-policy/default.xml'
data= None
with open(cache_file, 'rb') as f:
data = pickle.load(f)
def __init__(self, username):
self.username = username
return data
def get_values(self):
policies = [frontend.load_xml_preg(self.__default_policy_path)]
return policies
class samba_backend(applier_backend):
_samba_registry_file = '/var/cache/samba/registry.tdb'
@ -157,7 +156,7 @@ class samba_backend(applier_backend):
cache_file = os.path.join(self.cache_dir, 'cache.pkl')
# Load PReg paths from cache at first
cache = get_cache(cache_file, dict())
cache = util.get_cache(cache_file, dict())
try:
gpos = get_gpo_list(dc, self.creds, self.loadparm, 'administrator')
@ -254,136 +253,69 @@ def parse_arguments():
arguments.add_argument('--dc',
type=str,
help='FQDN of the domain to replicate SYSVOL from')
arguments.add_argument('--nodomain',
action='store_true',
help='Operate without domain (apply local policy)')
return arguments.parse_args()
def get_gpo_list(dc_hostname, creds, lp, user):
gpos = []
ads = samba.gpo.ADS_STRUCT(dc_hostname, lp, creds)
if ads.connect():
#gpos = ads.get_gpo_list(creds.get_username())
gpos = ads.get_gpo_list(user)
logging.info('Got GPO list:')
for gpo in gpos:
# These setters are taken from libgpo/pygpo.c
# print(gpo.ds_path) # LDAP entry
logging.info('{} ({})'.format(gpo.display_name, gpo.name))
logging.info('------')
return gpos
def apply_samba_dc(arg_dc, arg_user):
sambaopts = options.SambaOptions(parser)
credopts = options.CredentialsOptions(parser)
# Initialize loadparm context
lp = sambaopts.get_loadparm()
creds = credopts.get_credentials(lp, fallback_machine=True)
def get_machine_domain():
pass
sid_cache = os.path.join(lp.get('cache directory'), 'sid_cache.pkl')
cached_sids = util.get_cache(sid_cache, dict())
def select_dc(lp, creds, dc):
samba_dc = get_dc_hostname(creds, lp)
util.machine_kinit()
util.check_krb_ticket()
if samba_dc != dc and dc != None:
print('Samba DC setting is {} and is overwritten by user setting {}'.format(samba_dc, dc))
return dc
return samba_dc
# Determine the default Samba DC for replication and try
# to overwrite it with user setting.
dc = util.select_dc(lp, creds, arg_dc)
def wbinfo_getsid(domain, user):
'''
Get SID using wbinfo
'''
# This part works only on client
username = '{}\\{}'.format(domain.upper(), user)
sid = pysss_nss_idmap.getsidbyname(username)
username = arg_user
domain = util.get_domain_name(lp, creds, dc)
sid = ''
if username in sid:
return sid[username]['sid']
domain_username = '{}\\{}'.format(domain, username)
if domain_username in cached_sids:
sid = cached_sids[domain_username]
logging.info('Got cached SID {} for user {}'.format(sid, domain_username))
# This part works only on DC
wbinfo_cmd = ['wbinfo', '-n', username]
output = subprocess.check_output(wbinfo_cmd)
sid = output.split()[0].decode('utf-8')
try:
sid = util.wbinfo_getsid(domain, username)
except:
logging.warning('Error getting SID using wbinfo, will use cached SID: {}'.format(sid))
return sid
logging.info('Working with SID: {}'.format(sid))
def get_machine_name():
'''
Get localhost name looking like DC0$
'''
return socket.gethostname().split('.', 1)[0].upper() + "$"
cached_sids[domain_username] = sid
with open(sid_cache, 'wb') as f:
pickle.dump(cached_sids, f, pickle.HIGHEST_PROTOCOL)
logging.info('Cached SID {} for user {}'.format(sid, domain_username))
def machine_kinit():
'''
Perform kinit with machine credentials
'''
host = get_machine_name()
subprocess.call(['kinit', '-k', host])
print('kinit succeed')
back = samba_backend(lp, creds, sid, dc, username)
def check_krb_ticket():
'''
Check if Kerberos 5 ticket present
'''
try:
subprocess.check_call([ 'klist', '-s' ])
output = subprocess.check_output('klist', stderr=subprocess.STDOUT).decode()
logging.info(output)
except:
logging.error('Kerberos ticket check unsuccessful')
sys.exit(1)
logging.info('Ticket check succeed')
appl = frontend.applier(sid, back)
appl.apply_parameters()
def get_domain_name(lp, creds, dc):
'''
Get current Active Directory domain name
'''
# Get CLDAP record about domain
# Look and python/samba/netcmd/domain.py for more examples
res = netcmd_get_domain_infos_via_cldap(lp, None, dc)
logging.info('Found domain via CLDAP: {}'.format(res.dns_domain))
return res.dns_domain
def apply_local_policy(user):
back = local_policy_backend(user)
appl = frontend.applier('local-{}'.format(user), back)
appl.apply_parameters()
def main():
#back = hreg_filesystem_backend(args.sid)
parser = optparse.OptionParser('GPO Applier')
sambaopts = options.SambaOptions(parser)
credopts = options.CredentialsOptions(parser)
# Initialize loadparm context
lp = sambaopts.get_loadparm()
creds = credopts.get_credentials(lp, fallback_machine=True)
sid_cache = os.path.join(lp.get('cache directory'), 'sid_cache.pkl')
cached_sids = get_cache(sid_cache, dict())
args = parse_arguments()
machine_kinit()
check_krb_ticket()
# Determine the default Samba DC for replication and try
# to overwrite it with user setting.
dc = select_dc(lp, creds, args.dc)
username = args.user
domain = get_domain_name(lp, creds, dc)
sid = ''
domain_username = '{}\\{}'.format(domain, username)
if domain_username in cached_sids:
sid = cached_sids[domain_username]
logging.info('Got cached SID {} for user {}'.format(sid, domain_username))
try:
sid = wbinfo_getsid(domain, username)
except:
logging.warning('Error getting SID using wbinfo, will use cached SID: {}'.format(sid))
logging.info('Working with SID: {}'.format(sid))
cached_sids[domain_username] = sid
with open(sid_cache, 'wb') as f:
pickle.dump(cached_sids, f, pickle.HIGHEST_PROTOCOL)
logging.info('Cached SID {} for user {}'.format(sid, domain_username))
back = samba_backend(lp, creds, sid, dc, username)
appl = frontend.applier(back)
appl.apply_parameters()
if args.nodomain:
logging.info('Working without domain - applying Local Policy')
apply_local_policy(args.user)
else:
logging.info('Working with Samba DC')
apply_samba_domain(args.dc, args.user)
if __name__ == "__main__":
main()

98
gpoa/util/__init__.py Normal file
View File

@ -0,0 +1,98 @@
import logging
import subprocess
import socket
import sys
import pickle
from samba.gpclass import get_dc_hostname
from samba.netcmd.common import netcmd_get_domain_infos_via_cldap
import samba.gpo
import pysss_nss_idmap
def get_gpo_list(dc_hostname, creds, lp, user):
gpos = []
ads = samba.gpo.ADS_STRUCT(dc_hostname, lp, creds)
if ads.connect():
#gpos = ads.get_gpo_list(creds.get_username())
gpos = ads.get_gpo_list(user)
logging.info('Got GPO list:')
for gpo in gpos:
# These setters are taken from libgpo/pygpo.c
# print(gpo.ds_path) # LDAP entry
logging.info('{} ({})'.format(gpo.display_name, gpo.name))
logging.info('------')
return gpos
def get_machine_domain():
pass
def select_dc(lp, creds, dc):
samba_dc = get_dc_hostname(creds, lp)
if samba_dc != dc and dc != None:
print('Samba DC setting is {} and is overwritten by user setting {}'.format(samba_dc, dc))
return dc
return samba_dc
def wbinfo_getsid(domain, user):
'''
Get SID using wbinfo
'''
# This part works only on client
username = '{}\\{}'.format(domain.upper(), user)
sid = pysss_nss_idmap.getsidbyname(username)
if username in sid:
return sid[username]['sid']
# This part works only on DC
wbinfo_cmd = ['wbinfo', '-n', username]
output = subprocess.check_output(wbinfo_cmd)
sid = output.split()[0].decode('utf-8')
return sid
def machine_kinit():
'''
Perform kinit with machine credentials
'''
host = socket.gethostname().split('.', 1)[0].upper() + "$"
subprocess.call(['kinit', '-k', host])
print('kinit succeed')
def check_krb_ticket():
'''
Check if Kerberos 5 ticket present
'''
try:
subprocess.check_call([ 'klist', '-s' ])
output = subprocess.check_output('klist', stderr=subprocess.STDOUT).decode()
logging.info(output)
except:
logging.error('Kerberos ticket check unsuccessful')
sys.exit(1)
logging.info('Ticket check succeed')
def get_domain_name(lp, creds, dc):
'''
Get current Active Directory domain name
'''
# Get CLDAP record about domain
# Look and python/samba/netcmd/domain.py for more examples
res = netcmd_get_domain_infos_via_cldap(lp, None, dc)
logging.info('Found domain via CLDAP: {}'.format(res.dns_domain))
return res.dns_domain
def get_cache(cache_file, default_cache_obj):
if not os.path.exists(cache_file):
logging.info('Initializing missing cache file: {}'.format(cache_file))
with open(cache_file, 'wb') as f:
pickle.dump(default_cache_obj, f, pickle.HIGHEST_PROTOCOL)
data = None
with open(cache_file, 'rb') as f:
data = pickle.load(f)
return data