1
0
mirror of https://github.com/samba-team/samba.git synced 2025-01-26 10:04:02 +03:00

gpo: Add Cert Auto Enroll Advanced Config

Advanced configuration for Certifcate Auto
Enrollment is stored on the sysvol, and needs
to be parsed/used when provided.

Signed-off-by: David Mulder <dmulder@suse.com>
Reviewed-by: Jeremy Allison <jra@samba.org>

Autobuild-User(master): Jeremy Allison <jra@samba.org>
Autobuild-Date(master): Tue May  3 21:48:57 UTC 2022 on sn-devel-184
This commit is contained in:
David Mulder 2022-04-12 12:50:25 -06:00 committed by Jeremy Allison
parent a54d707435
commit ddeedcb6b2
2 changed files with 183 additions and 14 deletions

View File

@ -15,9 +15,10 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import operator
from samba.gpclass import gp_pol_ext
from samba import Ldb
from ldb import SCOPE_SUBTREE
from ldb import SCOPE_SUBTREE, SCOPE_BASE
from samba.auth import system_session
from samba.gpclass import get_dc_hostname
import base64
@ -35,6 +36,8 @@ cert_wrap = b"""
%s
-----END CERTIFICATE-----"""
global_trust_dir = '/etc/pki/trust/anchors'
endpoint_re = '(https|HTTPS)://(?P<server>[a-zA-Z0-9.-]+)/ADPolicyProvider' + \
'_CEP_(?P<auth>[a-zA-Z]+)/service.svc/CEP'
def octet_string_to_objectGUID(data):
return '%s-%s-%s-%s-%s' % ('%02x' % struct.unpack('<L', data[0:4])[0],
@ -43,6 +46,84 @@ def octet_string_to_objectGUID(data):
'%02x' % struct.unpack('>H', data[8:10])[0],
'%02x%02x' % struct.unpack('>HL', data[10:]))
'''
Group and Sort End Point Information
[MS-CAESO] 4.4.5.3.2.3
In this step autoenrollment processes the end point information by grouping it
by CEP ID and sorting in the order with which it will use the end point to
access the CEP information.
'''
def group_and_sort_end_point_information(end_point_information):
# Create groups of the CertificateEnrollmentPolicyEndPoint instances that
# have the same value of the EndPoint.PolicyID datum.
end_point_groups = {}
for e in end_point_information:
if e['PolicyID'] not in end_point_groups.keys():
end_point_groups[e['PolicyID']] = []
end_point_groups[e['PolicyID']].append(e)
# Sort each group by following these rules:
for end_point_group in end_point_groups.values():
# Sort the CertificateEnrollmentPolicyEndPoint instances in ascending
# order based on the EndPoint.Cost value.
end_point_group.sort(key=lambda e: e['Cost'])
# For instances that have the same EndPoint.Cost:
cost_list = [e['Cost'] for e in end_point_group]
costs = set(cost_list)
for cost in costs:
i = cost_list.index(cost)
j = len(cost_list)-operator.indexOf(reversed(cost_list), cost)-1
if i == j:
continue
# Sort those that have EndPoint.Authentication equal to Kerberos
# first. Then sort those that have EndPoint.Authentication equal to
# Anonymous. The rest of the CertificateEnrollmentPolicyEndPoint
# instances follow in an arbitrary order.
def sort_auth(e):
# 0x2 - Kerberos
if e['AuthFlags'] == 0x2:
return 0
# 0x1 - Anonymous
elif e['AuthFlags'] == 0x1:
return 1
else:
return 2
end_point_group[i:j+1] = sorted(end_point_group[i:j+1],
key=sort_auth)
return list(end_point_groups.values())
'''
Obtaining End Point Information
[MS-CAESO] 4.4.5.3.2.2
In this step autoenrollment initializes the
CertificateEnrollmentPolicyEndPoints table.
'''
def obtain_end_point_information(entries):
end_point_information = {}
section = 'Software\\Policies\\Microsoft\\Cryptography\\PolicyServers\\'
for e in entries:
if not e.keyname.startswith(section):
continue
name = e.keyname.replace(section, '')
if name not in end_point_information.keys():
end_point_information[name] = {}
end_point_information[name][e.valuename] = e.data
for ca in end_point_information.values():
m = re.match(endpoint_re, ca['URL'])
if m:
name = '%s-CA' % m.group('server').replace('.', '-')
ca['name'] = name
ca['hostname'] = m.group('server')
ca['auth'] = m.group('auth')
else:
edata = { 'endpoint': ca['URL'] }
log.error('Failed to parse the endpoint', edata)
end_point_information = \
group_and_sort_end_point_information(end_point_information.values())
return end_point_information
'''
Initializing CAs
[MS-CAESO] 4.4.5.3.1.2
@ -228,18 +309,96 @@ class gp_cert_auto_enroll_ext(gp_pol_ext):
manage = e.data & 0x2 == 1
retrive_pending = e.data & 0x4 == 1
if enroll:
url = 'ldap://%s' % get_dc_hostname(self.creds,
self.lp)
ldb = Ldb(url=url, session_info=system_session(),
lp=self.lp, credentials=self.creds)
cas = fetch_certification_authorities(ldb)
for ca in cas:
data = cert_enroll(ca, ldb, trust_dir, private_dir)
self.gp_db.store(str(self),
base64.b64encode(ca['name']).decode(),
data)
self.__enroll(pol_conf.entries, trust_dir,
private_dir)
self.gp_db.commit()
'''
Read CEP Data
[MS-CAESO] 4.4.5.3.2.4
In this step autoenrollment initializes instances of the
CertificateEnrollmentPolicy by accessing end points associated with CEP
groups created in the previous step.
'''
def __read_cep_data(self, ldb, end_point_information,
trust_dir, private_dir):
# For each group created in the previous step:
for end_point_group in end_point_information:
# Pick an arbitrary instance of the
# CertificateEnrollmentPolicyEndPoint from the group
e = end_point_group[0]
# If this instance does not have the AutoEnrollmentEnabled flag set
# in the EndPoint.Flags, continue with the next group.
if not e['Flags'] & 0x10:
continue
# If the current group contains a
# CertificateEnrollmentPolicyEndPoint instance with EndPoint.URI
# equal to "LDAP":
if any([e['URL'] == 'LDAP:' for e in end_point_group]):
# Perform an LDAP search to read the value of the objectGuid
# attribute of the root object of the forest root domain NC. If
# any errors are encountered, continue with the next group.
res = ldb.search('', SCOPE_BASE, '(objectClass=*)',
['rootDomainNamingContext'])
if len(res) != 1:
continue
res2 = ldb.search(res[0]['rootDomainNamingContext'][0],
SCOPE_BASE, '(objectClass=*)',
['objectGUID'])
if len(res2) != 1:
continue
# Compare the value read in the previous step to the
# EndPoint.PolicyId datum CertificateEnrollmentPolicyEndPoint
# instance. If the values do not match, continue with the next
# group.
objectGUID = '{%s}' % \
octet_string_to_objectGUID(res2[0]['objectGUID'][0]).upper()
if objectGUID != e['PolicyID']:
continue
# For each CertificateEnrollmentPolicyEndPoint instance for that
# group:
for ca in end_point_group:
# If EndPoint.URI equals "LDAP":
if ca['URL'] == 'LDAP:':
# This is a basic configuration.
cas = fetch_certification_authorities(ldb)
for ca in cas:
data = cert_enroll(ca, ldb, trust_dir, private_dir)
self.gp_db.store(str(self),
base64.b64encode(ca['name']).decode(),
data)
# If EndPoint.URI starts with "HTTPS//":
elif ca['URL'].lower().startswith('https://'):
data = cert_enroll(ca, ldb, trust_dir,
private_dir, auth=ca['auth'])
self.gp_db.store(str(self),
base64.b64encode(ca['name'].encode()).decode(),
data)
else:
edata = { 'endpoint': ca['URL'] }
log.error('Unrecognized endpoint', edata)
def __enroll(self, entries, trust_dir, private_dir):
url = 'ldap://%s' % get_dc_hostname(self.creds, self.lp)
ldb = Ldb(url=url, session_info=system_session(),
lp=self.lp, credentials=self.creds)
end_point_information = obtain_end_point_information(entries)
if len(end_point_information) > 0:
for end_point_group in end_point_information:
self.__read_cep_data(ldb, end_point_information,
trust_dir, private_dir)
else:
cas = fetch_certification_authorities(ldb)
for ca in cas:
data = cert_enroll(ca, ldb, trust_dir, private_dir)
self.gp_db.store(str(self),
base64.b64encode(ca['name']).decode(), data)
def rsop(self, gpo):
output = {}
pol_file = 'MACHINE/Registry.pol'
@ -258,15 +417,26 @@ class gp_cert_auto_enroll_ext(gp_pol_ext):
url = 'ldap://%s' % get_dc_hostname(self.creds, self.lp)
ldb = Ldb(url=url, session_info=system_session(),
lp=self.lp, credentials=self.creds)
end_point_information = \
obtain_end_point_information(pol_conf.entries)
cas = fetch_certification_authorities(ldb)
if len(end_point_information) > 0:
cas2 = [ep for sl in end_point_information for ep in sl]
if any([ca['URL'] == 'LDAP:' for ca in cas2]):
cas.extend(cas2)
else:
cas = cas2
for ca in cas:
if 'URL' in ca and ca['URL'] == 'LDAP:':
continue
policy = 'Auto Enrollment Policy'
cn = ca['name']
if policy not in output:
output[policy] = {}
output[policy][cn] = {}
output[policy][cn]['CA Certificate'] = \
format_root_cert(ca['cACertificate']).decode()
if 'cACertificate' in ca:
output[policy][cn]['CA Certificate'] = \
format_root_cert(ca['cACertificate']).decode()
output[policy][cn]['Auto Enrollment Server'] = \
ca['hostname']
supported_templates = \

View File

@ -1 +0,0 @@
samba.tests.gpo.samba.tests.gpo.GPOTests.test_advanced_gp_cert_auto_enroll_ext