From 0d1ff69936f18ea729fc11fbbb1569a833302572 Mon Sep 17 00:00:00 2001 From: Gabriel Nagy Date: Mon, 8 Jan 2024 18:05:08 +0200 Subject: [PATCH] gpo: Test certificate policy without NDES As of 8231eaf856b, the NDES feature is no longer required on Windows, as cert auto-enroll can use the certificate from the LDAP request. However, 157335ee93e changed the implementation to convert the LDAP certificate to base64 due to it failing to cleanly convert to a string. Because of insufficient test coverage I missed handling the part where NDES is disabled or not reachable and the LDAP certificate was imported. The call to load_der_x509_certificate now fails with an error because it expects binary data, yet it receives a base64 encoded string. This adds a test to confirm the issue. BUG: https://bugzilla.samba.org/show_bug.cgi?id=15557 Signed-off-by: Gabriel Nagy Reviewed-by: David Mulder Reviewed-by: Andreas Schneider --- python/samba/tests/gpo.py | 126 ++++++++++++++++++++++++++++++++++++-- selftest/knownfail.d/gpo | 1 + 2 files changed, 121 insertions(+), 6 deletions(-) create mode 100644 selftest/knownfail.d/gpo diff --git a/python/samba/tests/gpo.py b/python/samba/tests/gpo.py index 81bf3b8c03b..9177eef5afa 100644 --- a/python/samba/tests/gpo.py +++ b/python/samba/tests/gpo.py @@ -103,17 +103,21 @@ def dummy_certificate(): # Dummy requests structure for Certificate Auto Enrollment class dummy_requests(object): - @staticmethod - def get(url=None, params=None): + class exceptions(object): + ConnectionError = Exception + + def __init__(self, want_exception=False): + self.want_exception = want_exception + + def get(self, url=None, params=None): + if self.want_exception: + raise self.exceptions.ConnectionError + dummy = requests.Response() dummy._content = dummy_certificate() dummy.headers = {'Content-Type': 'application/x-x509-ca-cert'} return dummy - class exceptions(object): - ConnectionError = Exception -cae.requests = dummy_requests - realm = os.environ.get('REALM') policies = realm + '/POLICIES' realm = realm.lower() @@ -6906,6 +6910,114 @@ class GPOTests(tests.TestCase): # Unstage the Registry.pol file unstage_file(reg_pol) + def test_gp_cert_auto_enroll_ext_without_ndes(self): + local_path = self.lp.cache_path('gpo_cache') + guid = '{31B2F340-016D-11D2-945F-00C04FB984F9}' + reg_pol = os.path.join(local_path, policies, guid, + 'MACHINE/REGISTRY.POL') + cache_dir = self.lp.get('cache directory') + store = GPOStorage(os.path.join(cache_dir, 'gpo.tdb')) + + machine_creds = Credentials() + machine_creds.guess(self.lp) + machine_creds.set_machine_account() + + # Initialize the group policy extension + cae.requests = dummy_requests(want_exception=True) + ext = cae.gp_cert_auto_enroll_ext(self.lp, machine_creds, + machine_creds.get_username(), store) + + gpos = get_gpo_list(self.server, machine_creds, self.lp, + machine_creds.get_username()) + + # Stage the Registry.pol file with test data + parser = GPPolParser() + parser.load_xml(etree.fromstring(auto_enroll_reg_pol.strip())) + ret = stage_file(reg_pol, ndr_pack(parser.pol_file)) + self.assertTrue(ret, 'Could not create the target %s' % reg_pol) + + # Write the dummy CA entry, Enrollment Services, and Templates Entries + admin_creds = Credentials() + admin_creds.set_username(os.environ.get('DC_USERNAME')) + admin_creds.set_password(os.environ.get('DC_PASSWORD')) + admin_creds.set_realm(os.environ.get('REALM')) + hostname = get_dc_hostname(machine_creds, self.lp) + url = 'ldap://%s' % hostname + ldb = Ldb(url=url, session_info=system_session(), + lp=self.lp, credentials=admin_creds) + # Write the dummy CA + confdn = 'CN=Public Key Services,CN=Services,CN=Configuration,%s' % base_dn + ca_cn = '%s-CA' % hostname.replace('.', '-') + certa_dn = 'CN=%s,CN=Certification Authorities,%s' % (ca_cn, confdn) + ldb.add({'dn': certa_dn, + 'objectClass': 'certificationAuthority', + 'authorityRevocationList': ['XXX'], + 'cACertificate': dummy_certificate(), + 'certificateRevocationList': ['XXX'], + }) + # Write the dummy pKIEnrollmentService + enroll_dn = 'CN=%s,CN=Enrollment Services,%s' % (ca_cn, confdn) + ldb.add({'dn': enroll_dn, + 'objectClass': 'pKIEnrollmentService', + 'cACertificate': dummy_certificate(), + 'certificateTemplates': ['Machine'], + 'dNSHostName': hostname, + }) + # Write the dummy pKICertificateTemplate + template_dn = 'CN=Machine,CN=Certificate Templates,%s' % confdn + ldb.add({'dn': template_dn, + 'objectClass': 'pKICertificateTemplate', + }) + + with TemporaryDirectory() as dname: + try: + ext.process_group_policy([], gpos, dname, dname) + except Exception as e: + self.fail(str(e)) + + ca_crt = os.path.join(dname, '%s.crt' % ca_cn) + self.assertTrue(os.path.exists(ca_crt), + 'Root CA certificate was not requested') + machine_crt = os.path.join(dname, '%s.Machine.crt' % ca_cn) + self.assertTrue(os.path.exists(machine_crt), + 'Machine certificate was not requested') + machine_key = os.path.join(dname, '%s.Machine.key' % ca_cn) + self.assertTrue(os.path.exists(machine_key), + 'Machine key was not generated') + + # Verify RSOP does not fail + ext.rsop([g for g in gpos if g.name == guid][0]) + + # Check that a call to gpupdate --rsop also succeeds + ret = rsop(self.lp) + self.assertEqual(ret, 0, 'gpupdate --rsop failed!') + + # Remove policy + gp_db = store.get_gplog(machine_creds.get_username()) + del_gpos = get_deleted_gpos_list(gp_db, []) + ext.process_group_policy(del_gpos, [], dname) + self.assertFalse(os.path.exists(ca_crt), + 'Root CA certificate was not removed') + self.assertFalse(os.path.exists(machine_crt), + 'Machine certificate was not removed') + self.assertFalse(os.path.exists(machine_key), + 'Machine key was not removed') + out, _ = Popen(['getcert', 'list-cas'], stdout=PIPE).communicate() + self.assertNotIn(get_bytes(ca_cn), out, 'CA was not removed') + out, _ = Popen(['getcert', 'list'], stdout=PIPE).communicate() + self.assertNotIn(b'Machine', out, + 'Machine certificate not removed') + self.assertNotIn(b'Workstation', out, + 'Workstation certificate not removed') + + # Remove the dummy CA, pKIEnrollmentService, and pKICertificateTemplate + ldb.delete(certa_dn) + ldb.delete(enroll_dn) + ldb.delete(template_dn) + + # Unstage the Registry.pol file + unstage_file(reg_pol) + def test_gp_cert_auto_enroll_ext(self): local_path = self.lp.cache_path('gpo_cache') guid = '{31B2F340-016D-11D2-945F-00C04FB984F9}' @@ -6919,6 +7031,7 @@ class GPOTests(tests.TestCase): machine_creds.set_machine_account() # Initialize the group policy extension + cae.requests = dummy_requests() ext = cae.gp_cert_auto_enroll_ext(self.lp, machine_creds, machine_creds.get_username(), store) @@ -7517,6 +7630,7 @@ class GPOTests(tests.TestCase): machine_creds.set_machine_account() # Initialize the group policy extension + cae.requests = dummy_requests() ext = cae.gp_cert_auto_enroll_ext(self.lp, machine_creds, machine_creds.get_username(), store) diff --git a/selftest/knownfail.d/gpo b/selftest/knownfail.d/gpo new file mode 100644 index 00000000000..f1e590bc7d8 --- /dev/null +++ b/selftest/knownfail.d/gpo @@ -0,0 +1 @@ +^samba.tests.gpo.samba.tests.gpo.GPOTests.test_gp_cert_auto_enroll_ext_without_ndes