# gp_cert_auto_enroll_ext samba group policy # Copyright (C) David Mulder 2021 # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . import os from samba.gpclass import gp_pol_ext from samba import Ldb from ldb import SCOPE_SUBTREE from samba.auth import system_session from samba.gpclass import get_dc_hostname import base64 from tempfile import NamedTemporaryFile from shutil import move, which from subprocess import Popen, PIPE import re from glob import glob import json cert_wrap = b""" -----BEGIN CERTIFICATE----- %s -----END CERTIFICATE-----""" global_trust_dir = '/etc/pki/trust/anchors' def fetch_certification_authorities(ldb): result = [] basedn = ldb.get_default_basedn() dn = 'CN=Certification Authorities,CN=Public Key Services,CN=Services,CN=Configuration,%s' % basedn expr = '(objectClass=certificationAuthority)' res = ldb.search(dn, SCOPE_SUBTREE, expr, ['cn']) if len(res) == 0: return result dn = 'CN=Enrollment Services,CN=Public Key Services,CN=Services,CN=Configuration,%s' % basedn attrs = ['cACertificate', 'cn', 'certificateTemplates', 'dNSHostName'] for ca in res: expr = '(cn=%s)' % ca['cn'] res2 = ldb.search(dn, SCOPE_SUBTREE, expr, attrs) if len(res) != 1: continue templates = {} for template in res2[0]['certificateTemplates']: templates[template] = fetch_template_attrs(ldb, template) res = dict(res2[0]) res['certificateTemplates'] = templates result.append(res) return result def fetch_template_attrs(ldb, name, attrs=['msPKI-Minimal-Key-Size']): basedn = ldb.get_default_basedn() dn = 'CN=Certificate Templates,CN=Public Key Services,CN=Services,CN=Configuration,%s' % basedn expr = '(cn=%s)' % name res = ldb.search(dn, SCOPE_SUBTREE, expr, attrs) if len(res) == 1 and 'msPKI-Minimal-Key-Size' in res[0]: return dict(res[0]) else: return {'msPKI-Minimal-Key-Size': ['2048']} def format_root_cert(cert): cert = base64.b64encode(cert) return cert_wrap % re.sub(b"(.{64})", b"\\1\n", cert, 0, re.DOTALL) def find_cepces_submit(): certmonger_dirs = [os.environ.get("PATH"), '/usr/lib/certmonger', '/usr/libexec/certmonger'] return which('cepces-submit', path=':'.join(certmonger_dirs)) def get_supported_templates(server, logger): cepces_submit = find_cepces_submit() if os.path.exists(cepces_submit): env = os.environ env['CERTMONGER_OPERATION'] = 'GET-SUPPORTED-TEMPLATES' p = Popen([cepces_submit, '--server=%s' % server], env=env, stdout=PIPE, stderr=PIPE) out, err = p.communicate() if p.returncode != 0: logger.warn('Failed to fetch the list of supported templates:' + '\n%s' % err) return out.strip().split() return [] def cert_enroll(ca, trust_dir, private_dir, logger): # Install the root certificate chain data = {'files': [], 'templates': []} sscep = which('sscep') if sscep is not None: url = 'http://%s/CertSrv/mscep/mscep.dll/pkiclient.exe?' % \ ca['dNSHostName'][0] root_cert = os.path.join(trust_dir, '%s.crt' % ca['cn']) ret = Popen([sscep, 'getca', '-F', 'sha1', '-c', root_cert, '-u', url]).wait() if ret != 0: logger.warn('sscep failed to fetch the root certificate chain.') logger.warn('Ensure you have installed and configured the' + ' Network Device Enrollment Service.') root_certs = glob('%s*' % root_cert) data['files'].extend(root_certs) for src in root_certs: # Symlink the certs to global trust dir dst = os.path.join(global_trust_dir, os.path.basename(src)) try: os.symlink(src, dst) data['files'].append(dst) except PermissionError: logger.warn('Failed to symlink root certificate to the' + ' admin trust anchors') except FileNotFoundError: logger.warn('Failed to symlink root certificate to the' + ' admin trust anchors.' + ' The directory %s was not found' % \ global_trust_dir) except FileExistsError: # If we're simply downloading a renewed cert, the symlink # already exists. Ignore the FileExistsError. Preserve the # existing symlink in the unapply data. data['files'].append(dst) else: logger.warn('sscep is not installed, which prevents the installation' + ' of the root certificate chain.') update = which('update-ca-certificates') if update is not None: Popen([update]).wait() # Setup Certificate Auto Enrollment getcert = which('getcert') cepces_submit = find_cepces_submit() if getcert is not None and os.path.exists(cepces_submit): Popen([getcert, 'add-ca', '-c', ca['cn'][0], '-e', '%s --server=%s' % (cepces_submit, ca['dNSHostName'][0])]).wait() supported_templates = get_supported_templates(ca['dNSHostName'][0], logger) for template, attrs in ca['certificateTemplates'].items(): if template not in supported_templates: continue nickname = '%s.%s' % (ca['cn'][0], template.decode()) keyfile = os.path.join(private_dir, '%s.key' % nickname) certfile = os.path.join(trust_dir, '%s.crt' % nickname) Popen([getcert, 'request', '-c', ca['cn'][0], '-T', template.decode(), '-I', nickname, '-k', keyfile, '-f', certfile, '-g', attrs['msPKI-Minimal-Key-Size'][0]]).wait() data['files'].extend([keyfile, certfile]) data['templates'].append(nickname) if update is not None: Popen([update]).wait() else: logger.warn('certmonger and cepces must be installed for ' + 'certificate auto enrollment to work') return json.dumps(data) class gp_cert_auto_enroll_ext(gp_pol_ext): def __str__(self): return 'Cryptography\AutoEnrollment' def process_group_policy(self, deleted_gpo_list, changed_gpo_list, trust_dir=None, private_dir=None): if trust_dir is None: trust_dir = self.lp.cache_path('certs') if private_dir is None: private_dir = self.lp.private_path('certs') if not os.path.exists(trust_dir): os.mkdir(trust_dir, mode=0o755) if not os.path.exists(private_dir): os.mkdir(private_dir, mode=0o700) for guid, settings in deleted_gpo_list: self.gp_db.set_guid(guid) if str(self) in settings: for ca_cn_enc, data in settings[str(self)].items(): ca_cn = base64.b64decode(ca_cn_enc) data = json.loads(data) getcert = which('getcert') if getcert is not None: Popen([getcert, 'remove-ca', '-c', ca_cn]).wait() for nickname in data['templates']: Popen([getcert, 'stop-tracking', '-i', nickname]).wait() for f in data['files']: if os.path.exists(f): os.unlink(f) self.gp_db.delete(str(self), ca_cn_enc) self.gp_db.commit() for gpo in changed_gpo_list: if gpo.file_sys_path: section = 'Software\Policies\Microsoft\Cryptography\AutoEnrollment' self.gp_db.set_guid(gpo.name) pol_file = 'MACHINE/Registry.pol' path = os.path.join(gpo.file_sys_path, pol_file) pol_conf = self.parse(path) if not pol_conf: continue for e in pol_conf.entries: if e.keyname == section and e.valuename == 'AEPolicy': # This policy applies as specified in [MS-CAESO] 4.4.5.1 if e.data == 0x8000: continue # The policy is disabled enroll = e.data & 0x1 == 1 manage = e.data & 0x2 == 1 retrive_pending = e.data & 0x4 == 1 if enroll: url = 'ldap://%s' % get_dc_hostname(self.creds, self.lp) ldb = Ldb(url=url, session_info=system_session(), lp=self.lp, credentials=self.creds) cas = fetch_certification_authorities(ldb) for ca in cas: data = cert_enroll(ca, trust_dir, private_dir, self.logger) self.gp_db.store(str(self), base64.b64encode(ca['cn'][0]).decode(), data) self.gp_db.commit() def rsop(self, gpo): output = {} pol_file = 'MACHINE/Registry.pol' section = 'Software\Policies\Microsoft\Cryptography\AutoEnrollment' if gpo.file_sys_path: path = os.path.join(gpo.file_sys_path, pol_file) pol_conf = self.parse(path) if not pol_conf: return output for e in pol_conf.entries: if e.keyname == section and e.valuename == 'AEPolicy': enroll = e.data & 0x1 == 1 if e.data == 0x8000 or not enroll: continue output['Auto Enrollment Policy'] = {} url = 'ldap://%s' % get_dc_hostname(self.creds, self.lp) ldb = Ldb(url=url, session_info=system_session(), lp=self.lp, credentials=self.creds) cas = fetch_certification_authorities(ldb) for ca in cas: policy = 'Auto Enrollment Policy' cn = ca['cn'][0] output[policy][cn] = {} output[policy][cn]['CA Certificate'] = \ format_root_cert(ca['cACertificate'][0]).decode() output[policy][cn]['Auto Enrollment Server'] = \ ca['dNSHostName'][0] supported_templates = \ get_supported_templates(ca['dNSHostName'][0], self.logger) output[policy][cn]['Templates'] = \ [t.decode() for t in supported_templates] return output