mirror of
https://github.com/samba-team/samba.git
synced 2025-08-08 13:49:29 +03:00
gpo: Add Certificate Auto Enrollment Policy
Signed-off-by: David Mulder <dmulder@suse.com> Reviewed-by: Jeremy Allison <jra@samba.org>
This commit is contained in:
committed by
Jeremy Allison
parent
cca9ce5977
commit
9c0a174af2
244
python/samba/gp_cert_auto_enroll_ext.py
Normal file
244
python/samba/gp_cert_auto_enroll_ext.py
Normal file
@ -0,0 +1,244 @@
|
||||
# gp_cert_auto_enroll_ext samba group policy
|
||||
# Copyright (C) David Mulder <dmulder@suse.com> 2021
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation; either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
import os
|
||||
from samba.gpclass import gp_pol_ext
|
||||
from samba import Ldb
|
||||
from ldb import SCOPE_SUBTREE
|
||||
from samba.auth import system_session
|
||||
from samba.gpclass import get_dc_hostname
|
||||
import base64
|
||||
from tempfile import NamedTemporaryFile
|
||||
from shutil import move, which
|
||||
from subprocess import Popen, PIPE
|
||||
import re
|
||||
from glob import glob
|
||||
import json
|
||||
|
||||
cert_wrap = b"""
|
||||
-----BEGIN CERTIFICATE-----
|
||||
%s
|
||||
-----END CERTIFICATE-----"""
|
||||
global_trust_dir = '/etc/pki/trust/anchors'
|
||||
|
||||
def fetch_certification_authorities(ldb):
|
||||
result = []
|
||||
basedn = ldb.get_default_basedn()
|
||||
dn = 'CN=Certification Authorities,CN=Public Key Services,CN=Services,CN=Configuration,%s' % basedn
|
||||
expr = '(objectClass=certificationAuthority)'
|
||||
res = ldb.search(dn, SCOPE_SUBTREE, expr, ['cn'])
|
||||
if len(res) == 0:
|
||||
return result
|
||||
dn = 'CN=Enrollment Services,CN=Public Key Services,CN=Services,CN=Configuration,%s' % basedn
|
||||
attrs = ['cACertificate', 'cn', 'certificateTemplates', 'dNSHostName']
|
||||
for ca in res:
|
||||
expr = '(cn=%s)' % ca['cn']
|
||||
res2 = ldb.search(dn, SCOPE_SUBTREE, expr, attrs)
|
||||
if len(res) != 1:
|
||||
continue
|
||||
templates = {}
|
||||
for template in res2[0]['certificateTemplates']:
|
||||
templates[template] = fetch_template_attrs(ldb, template)
|
||||
res = dict(res2[0])
|
||||
res['certificateTemplates'] = templates
|
||||
result.append(res)
|
||||
return result
|
||||
|
||||
def fetch_template_attrs(ldb, name, attrs=['msPKI-Minimal-Key-Size']):
|
||||
basedn = ldb.get_default_basedn()
|
||||
dn = 'CN=Certificate Templates,CN=Public Key Services,CN=Services,CN=Configuration,%s' % basedn
|
||||
expr = '(cn=%s)' % name
|
||||
res = ldb.search(dn, SCOPE_SUBTREE, expr, attrs)
|
||||
if len(res) == 1 and 'msPKI-Minimal-Key-Size' in res[0]:
|
||||
return dict(res[0])
|
||||
else:
|
||||
return {'msPKI-Minimal-Key-Size': ['2048']}
|
||||
|
||||
def format_root_cert(cert):
|
||||
cert = base64.b64encode(cert)
|
||||
return cert_wrap % re.sub(b"(.{64})", b"\\1\n", cert, 0, re.DOTALL)
|
||||
|
||||
def find_cepces_submit():
|
||||
certmonger_dirs = [os.environ.get("PATH"), '/usr/lib/certmonger',
|
||||
'/usr/libexec/certmonger']
|
||||
return which('cepces-submit', path=':'.join(certmonger_dirs))
|
||||
|
||||
def get_supported_templates(server):
|
||||
cepces_submit = find_cepces_submit()
|
||||
if os.path.exists(cepces_submit):
|
||||
env = os.environ
|
||||
env['CERTMONGER_OPERATION'] = 'GET-SUPPORTED-TEMPLATES'
|
||||
out, _ = Popen([cepces_submit, '--server=%s' % server], env=env,
|
||||
stdout=PIPE, stderr=PIPE).communicate()
|
||||
return out.strip().split()
|
||||
return []
|
||||
|
||||
def cert_enroll(ca, trust_dir, private_dir, logger):
|
||||
# Install the root certificate chain
|
||||
data = {'files': [], 'templates': []}
|
||||
sscep = which('sscep')
|
||||
if sscep is not None:
|
||||
url = 'http://%s/CertSrv/mscep/mscep.dll/pkiclient.exe?' % \
|
||||
ca['dNSHostName'][0]
|
||||
root_cert = os.path.join(trust_dir, '%s.crt' % ca['cn'])
|
||||
ret = Popen([sscep, 'getca', '-F', 'sha1', '-c',
|
||||
root_cert, '-u', url]).wait()
|
||||
if ret != 0:
|
||||
logger.warn('sscep failed to fetch the root certificate chain.')
|
||||
root_certs = glob('%s*' % root_cert)
|
||||
data['files'].extend(root_certs)
|
||||
for src in root_certs:
|
||||
# Symlink the certs to global trust dir
|
||||
dst = os.path.join(global_trust_dir, os.path.basename(src))
|
||||
try:
|
||||
os.symlink(src, dst)
|
||||
data['files'].append(dst)
|
||||
except PermissionError:
|
||||
logger.warn('Failed to symlink root certificate to the' +
|
||||
' admin trust anchors')
|
||||
except FileNotFoundError:
|
||||
logger.warn('Failed to symlink root certificate to the' +
|
||||
' admin trust anchors.' +
|
||||
' The directory %s was not found' % \
|
||||
global_trust_dir)
|
||||
else:
|
||||
logger.warn('sscep is not installed, which prevents the installation' +
|
||||
' of the root certificate chain.')
|
||||
update = which('update-ca-certificates')
|
||||
if update is not None:
|
||||
Popen([update]).wait()
|
||||
# Setup Certificate Auto Enrollment
|
||||
getcert = which('getcert')
|
||||
cepces_submit = find_cepces_submit()
|
||||
if getcert is not None and os.path.exists(cepces_submit):
|
||||
Popen([getcert, 'add-ca', '-c', ca['cn'][0], '-e',
|
||||
'%s --server=%s' % (cepces_submit, ca['dNSHostName'][0])]).wait()
|
||||
supported_templates = get_supported_templates(ca['dNSHostName'][0])
|
||||
for template, attrs in ca['certificateTemplates'].items():
|
||||
if template not in supported_templates:
|
||||
continue
|
||||
nickname = '%s.%s' % (ca['cn'][0], template.decode())
|
||||
keyfile = os.path.join(private_dir, '%s.key' % nickname)
|
||||
certfile = os.path.join(trust_dir, '%s.crt' % nickname)
|
||||
Popen([getcert, 'request', '-c', ca['cn'][0],
|
||||
'-T', template.decode(),
|
||||
'-I', nickname, '-k', keyfile, '-f', certfile,
|
||||
'-g', attrs['msPKI-Minimal-Key-Size'][0]]).wait()
|
||||
data['files'].extend([keyfile, certfile])
|
||||
data['templates'].append(nickname)
|
||||
if update is not None:
|
||||
Popen([update]).wait()
|
||||
else:
|
||||
logger.warn('certmonger and cepces must be installed for ' +
|
||||
'certificate auto enrollment to work')
|
||||
return json.dumps(data)
|
||||
|
||||
class gp_cert_auto_enroll_ext(gp_pol_ext):
|
||||
def __str__(self):
|
||||
return 'Cryptography\AutoEnrollment'
|
||||
|
||||
def process_group_policy(self, deleted_gpo_list, changed_gpo_list,
|
||||
trust_dir=None, private_dir=None):
|
||||
if trust_dir is None:
|
||||
trust_dir = self.lp.cache_path('certs')
|
||||
if private_dir is None:
|
||||
private_dir = self.lp.private_path('certs')
|
||||
if not os.path.exists(trust_dir):
|
||||
os.mkdir(trust_dir, mode=0o755)
|
||||
if not os.path.exists(private_dir):
|
||||
os.mkdir(private_dir, mode=0o700)
|
||||
|
||||
for guid, settings in deleted_gpo_list:
|
||||
self.gp_db.set_guid(guid)
|
||||
if str(self) in settings:
|
||||
for ca_cn_enc, data in settings[str(self)].items():
|
||||
ca_cn = base64.b64decode(ca_cn_enc)
|
||||
data = json.loads(data)
|
||||
getcert = which('getcert')
|
||||
if getcert is not None:
|
||||
Popen([getcert, 'remove-ca', '-c', ca_cn]).wait()
|
||||
for nickname in data['templates']:
|
||||
Popen([getcert, 'stop-tracking',
|
||||
'-i', nickname]).wait()
|
||||
for f in data['files']:
|
||||
if os.path.exists(f):
|
||||
os.unlink(f)
|
||||
self.gp_db.delete(str(self), ca_cn_enc)
|
||||
self.gp_db.commit()
|
||||
|
||||
for gpo in changed_gpo_list:
|
||||
if gpo.file_sys_path:
|
||||
section = 'Software\Policies\Microsoft\Cryptography\AutoEnrollment'
|
||||
self.gp_db.set_guid(gpo.name)
|
||||
pol_file = 'MACHINE/Registry.pol'
|
||||
path = os.path.join(gpo.file_sys_path, pol_file)
|
||||
pol_conf = self.parse(path)
|
||||
if not pol_conf:
|
||||
continue
|
||||
for e in pol_conf.entries:
|
||||
if e.keyname == section and e.valuename == 'AEPolicy':
|
||||
# This policy applies as specified in [MS-CAESO] 4.4.5.1
|
||||
if e.data == 0x8000:
|
||||
continue # The policy is disabled
|
||||
enroll = e.data & 0x1 == 1
|
||||
manage = e.data & 0x2 == 1
|
||||
retrive_pending = e.data & 0x4 == 1
|
||||
if enroll:
|
||||
url = 'ldap://%s' % get_dc_hostname(self.creds,
|
||||
self.lp)
|
||||
ldb = Ldb(url=url, session_info=system_session(),
|
||||
lp=self.lp, credentials=self.creds)
|
||||
cas = fetch_certification_authorities(ldb)
|
||||
for ca in cas:
|
||||
data = cert_enroll(ca, trust_dir,
|
||||
private_dir, self.logger)
|
||||
self.gp_db.store(str(self),
|
||||
base64.b64encode(ca['cn'][0]).decode(),
|
||||
data)
|
||||
self.gp_db.commit()
|
||||
|
||||
def rsop(self, gpo):
|
||||
output = {}
|
||||
pol_file = 'MACHINE/Registry.pol'
|
||||
section = 'Software\Policies\Microsoft\Cryptography\AutoEnrollment'
|
||||
if gpo.file_sys_path:
|
||||
path = os.path.join(gpo.file_sys_path, pol_file)
|
||||
pol_conf = self.parse(path)
|
||||
if not pol_conf:
|
||||
return output
|
||||
for e in pol_conf.entries:
|
||||
if e.keyname == section and e.valuename == 'AEPolicy':
|
||||
enroll = e.data & 0x1 == 1
|
||||
if e.data == 0x8000 or not enroll:
|
||||
continue
|
||||
output['Auto Enrollment Policy'] = {}
|
||||
url = 'ldap://%s' % get_dc_hostname(self.creds, self.lp)
|
||||
ldb = Ldb(url=url, session_info=system_session(),
|
||||
lp=self.lp, credentials=self.creds)
|
||||
cas = fetch_certification_authorities(ldb)
|
||||
for ca in cas:
|
||||
policy = 'Auto Enrollment Policy'
|
||||
cn = ca['cn'][0]
|
||||
output[policy][cn] = {}
|
||||
output[policy][cn]['CA Certificate'] = \
|
||||
format_root_cert(ca['cACertificate'][0]).decode()
|
||||
output[policy][cn]['Auto Enrollment Server'] = \
|
||||
ca['dNSHostName'][0]
|
||||
supported_templates = \
|
||||
get_supported_templates(ca['dNSHostName'][0])
|
||||
output[policy][cn]['Templates'] = \
|
||||
[t.decode() for t in supported_templates]
|
||||
return output
|
@ -45,6 +45,7 @@ from samba.vgp_issue_ext import vgp_issue_ext
|
||||
from samba.vgp_startup_scripts_ext import vgp_startup_scripts_ext
|
||||
from samba.vgp_access_ext import vgp_access_ext
|
||||
from samba.gp_gnome_settings_ext import gp_gnome_settings_ext
|
||||
from samba.gp_cert_auto_enroll_ext import gp_cert_auto_enroll_ext
|
||||
import logging
|
||||
|
||||
if __name__ == "__main__":
|
||||
@ -109,6 +110,7 @@ if __name__ == "__main__":
|
||||
gp_extensions.append(vgp_startup_scripts_ext)
|
||||
gp_extensions.append(vgp_access_ext)
|
||||
gp_extensions.append(gp_gnome_settings_ext)
|
||||
gp_extensions.append(gp_cert_auto_enroll_ext)
|
||||
gp_extensions.extend(machine_exts)
|
||||
elif opts.target == 'User':
|
||||
gp_extensions.extend(user_exts)
|
||||
|
Reference in New Issue
Block a user