]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
gpo: Test Certificate Auto Enrollment Policy
authorDavid Mulder <dmulder@samba.org>
Fri, 2 Jul 2021 20:44:43 +0000 (20:44 +0000)
committerJeremy Allison <jra@samba.org>
Thu, 15 Jul 2021 19:13:29 +0000 (19:13 +0000)
Signed-off-by: David Mulder <dmulder@samba.org>
Reviewed-by: Jeremy Allison <jra@samba.org>
python/samba/tests/bin/cepces-submit [new file with mode: 0755]
python/samba/tests/bin/getcert [new file with mode: 0755]
python/samba/tests/bin/sscep [new file with mode: 0755]
python/samba/tests/gpo.py
python/samba/tests/usage.py
source4/selftest/tests.py

diff --git a/python/samba/tests/bin/cepces-submit b/python/samba/tests/bin/cepces-submit
new file mode 100755 (executable)
index 0000000..1f9d57c
--- /dev/null
@@ -0,0 +1,15 @@
+#!/usr/bin/python3
+import optparse
+import os, sys, re
+
+sys.path.insert(0, "bin/python")
+
+if __name__ == "__main__":
+    parser = optparse.OptionParser('cepces-submit [options]')
+    parser.add_option('--server')
+
+    (opts, args) = parser.parse_args()
+    assert opts.server is not None
+    if 'CERTMONGER_OPERATION' in os.environ and \
+       os.environ['CERTMONGER_OPERATION'] == 'GET-SUPPORTED-TEMPLATES':
+        print('Machine') # Report a Machine template
diff --git a/python/samba/tests/bin/getcert b/python/samba/tests/bin/getcert
new file mode 100755 (executable)
index 0000000..93895eb
--- /dev/null
@@ -0,0 +1,84 @@
+#!/usr/bin/python3
+import optparse
+import os, sys, re
+import pickle
+
+sys.path.insert(0, "bin/python")
+
+if __name__ == "__main__":
+    parser = optparse.OptionParser('getcert <cmd> [options]')
+    parser.add_option('-i')
+    parser.add_option('-c')
+    parser.add_option('-T')
+    parser.add_option('-I')
+    parser.add_option('-k')
+    parser.add_option('-f')
+    parser.add_option('-e')
+    parser.add_option('-g')
+
+    (opts, args) = parser.parse_args()
+    assert len(args) == 1
+    assert args[0] in ['add-ca', 'request', 'remove-ca', 'stop-tracking',
+                       'list', 'list-cas']
+
+    # Use a dir we can write to in the testenv
+    if 'LOCAL_PATH' in os.environ:
+        data_dir = os.path.realpath(os.environ.get('LOCAL_PATH'))
+    else:
+        data_dir = os.path.dirname(os.path.realpath(__file__))
+    dump_file = os.path.join(data_dir, 'getcert.dump')
+    if os.path.exists(dump_file):
+        with open(dump_file, 'rb') as r:
+            cas, certs = pickle.load(r)
+    else:
+        cas = {}
+        certs = {}
+    if args[0] == 'add-ca':
+        # Add a fake CA entry
+        assert opts.c not in cas.keys()
+        cas[opts.c] = opts.e
+    elif args[0] == 'remove-ca':
+        # Remove a fake CA entry
+        assert opts.c in cas.keys()
+        del cas[opts.c]
+    elif args[0] == 'list-cas':
+        # List the fake CAs
+        for ca, helper_location in cas.items():
+            print('CA \'%s\':\n\tis-default: no\n\tca-type: EXTERNAL\n' % ca +
+                  '\thelper-location: %s' % helper_location)
+    elif args[0] == 'request':
+        # Add a fake cert request
+        assert opts.c in cas.keys()
+        assert opts.I not in certs.keys()
+        certs[opts.I] = { 'ca': opts.c, 'template': opts.T,
+                          'keyfile': os.path.abspath(opts.k),
+                          'certfile': os.path.abspath(opts.f),
+                          'keysize': opts.g }
+        # Create dummy key and cert (empty files)
+        with open(opts.k, 'w') as w:
+            pass
+        with open(opts.f, 'w') as w:
+            pass
+    elif args[0] == 'stop-tracking':
+        # Remove the fake cert request
+        assert opts.i in certs.keys()
+        del certs[opts.i]
+    elif args[0] == 'list':
+        # List the fake cert requests
+        print('Number of certificates and requests being tracked: %d.' % \
+              len(certs))
+        for rid, data in certs.items():
+            print('Request ID \'%s\':\n\tstatus: MONITORING\n' % rid +
+                  '\tstuck: no\n\tkey pair storage: type=FILE,' +
+                  'location=\'%s\'' % data['keyfile'] + '\n\t' +
+                  'certificate: type=FILE,location=\'%s\'' % data['certfile'] +
+                  '\n\tCA: %s\n\t' % data['ca'] +
+                  'certificate template/profile: %s\n\t' % data['template'] +
+                  'track: yes\n\tauto-renew: yes')
+
+    if len(cas.items()) == 0 and len(certs.items()) == 0:
+        if os.path.exists(dump_file):
+            os.unlink(dump_file)
+    else:
+        with open(dump_file, 'wb') as w:
+            pickle.dump((cas, certs), w)
diff --git a/python/samba/tests/bin/sscep b/python/samba/tests/bin/sscep
new file mode 100755 (executable)
index 0000000..d0d8892
--- /dev/null
@@ -0,0 +1,19 @@
+#!/usr/bin/python3
+import optparse
+import os, sys, re
+
+sys.path.insert(0, "bin/python")
+
+if __name__ == "__main__":
+    parser = optparse.OptionParser('sscep <cmd> [options]')
+    parser.add_option('-F')
+    parser.add_option('-c')
+    parser.add_option('-u')
+
+    (opts, args) = parser.parse_args()
+    assert len(args) == 1
+    assert args[0] == 'getca'
+    assert opts.F == 'sha1'
+    # Create dummy root cert (empty file)
+    with open(opts.c, 'w') as w:
+        pass
index 4df0c23c4564b0db935e763fb8bd57b78c85ef17..b5dc09543ad0c4926aef358210267413a8ac0212 100644 (file)
@@ -38,6 +38,7 @@ from samba.vgp_motd_ext import vgp_motd_ext
 from samba.vgp_issue_ext import vgp_issue_ext
 from samba.vgp_access_ext import vgp_access_ext
 from samba.gp_gnome_settings_ext import gp_gnome_settings_ext
+from samba.gp_cert_auto_enroll_ext import gp_cert_auto_enroll_ext
 import logging
 from samba.credentials import Credentials
 from samba.gp_msgs_ext import gp_msgs_ext
@@ -51,6 +52,9 @@ import hashlib
 from samba.gp_parse.gp_pol import GPPolParser
 from glob import glob
 from configparser import ConfigParser
+from samba.gpclass import get_dc_hostname
+from samba import Ldb
+from samba.auth import system_session
 
 realm = os.environ.get('REALM')
 policies = realm + '/POLICIES'
@@ -198,6 +202,28 @@ b"""
 </PolFile>
 """
 
+auto_enroll_reg_pol = \
+b"""
+<?xml version="1.0" encoding="utf-8"?>
+<PolFile num_entries="3" signature="PReg" version="1">
+        <Entry type="4" type_name="REG_DWORD">
+                <Key>Software\Policies\Microsoft\Cryptography\AutoEnrollment</Key>
+                <ValueName>AEPolicy</ValueName>
+                <Value>7</Value>
+        </Entry>
+        <Entry type="4" type_name="REG_DWORD">
+                <Key>Software\Policies\Microsoft\Cryptography\AutoEnrollment</Key>
+                <ValueName>OfflineExpirationPercent</ValueName>
+                <Value>10</Value>
+        </Entry>
+        <Entry type="1" type_name="REG_SZ">
+                <Key>Software\Policies\Microsoft\Cryptography\AutoEnrollment</Key>
+                <ValueName>OfflineExpirationStoreNames</ValueName>
+                <Value>MY</Value>
+        </Entry>
+</PolFile>
+"""
+
 def days2rel_nttime(val):
     seconds = 60
     minutes = 60
@@ -1860,3 +1886,101 @@ class GPOTests(tests.TestCase):
 
         # Unstage the Registry.pol file
         unstage_file(reg_pol)
+
+    def test_gp_cert_auto_enroll_ext(self):
+        local_path = self.lp.cache_path('gpo_cache')
+        guid = '{31B2F340-016D-11D2-945F-00C04FB984F9}'
+        reg_pol = os.path.join(local_path, policies, guid,
+                               'MACHINE/REGISTRY.POL')
+        logger = logging.getLogger('gpo_tests')
+        cache_dir = self.lp.get('cache directory')
+        store = GPOStorage(os.path.join(cache_dir, 'gpo.tdb'))
+
+        machine_creds = Credentials()
+        machine_creds.guess(self.lp)
+        machine_creds.set_machine_account()
+
+        # Initialize the group policy extension
+        ext = gp_cert_auto_enroll_ext(logger, self.lp, machine_creds, store)
+
+        ads = gpo.ADS_STRUCT(self.server, self.lp, machine_creds)
+        if ads.connect():
+            gpos = ads.get_gpo_list(machine_creds.get_username())
+
+        # Stage the Registry.pol file with test data
+        parser = GPPolParser()
+        parser.load_xml(etree.fromstring(auto_enroll_reg_pol.strip()))
+        ret = stage_file(reg_pol, ndr_pack(parser.pol_file))
+        self.assertTrue(ret, 'Could not create the target %s' % reg_pol)
+
+        # Write the dummy CA entry, Enrollment Services, and Templates Entries
+        admin_creds = Credentials()
+        admin_creds.set_username(os.environ.get('DC_USERNAME'))
+        admin_creds.set_password(os.environ.get('DC_PASSWORD'))
+        admin_creds.set_realm(os.environ.get('REALM'))
+        hostname = get_dc_hostname(machine_creds, self.lp)
+        url = 'ldap://%s' % hostname
+        ldb = Ldb(url=url, session_info=system_session(),
+                  lp=self.lp, credentials=admin_creds)
+        # Write the dummy CA
+        confdn = 'CN=Public Key Services,CN=Services,CN=Configuration,%s' % base_dn
+        ca_cn = '%s-CA' % hostname.replace('.', '-')
+        certa_dn = 'CN=%s,CN=Certification Authorities,%s' % (ca_cn, confdn)
+        ldb.add({'dn': certa_dn,
+                 'objectClass': 'certificationAuthority',
+                 'authorityRevocationList': ['XXX'],
+                 'cACertificate': 'XXX',
+                 'certificateRevocationList': ['XXX'],
+                })
+        # Write the dummy pKIEnrollmentService
+        enroll_dn = 'CN=%s,CN=Enrollment Services,%s' % (ca_cn, confdn)
+        ldb.add({'dn': enroll_dn,
+                 'objectClass': 'pKIEnrollmentService',
+                 'cACertificate': 'XXXX',
+                 'certificateTemplates': ['Machine'],
+                 'dNSHostName': hostname,
+                })
+        # Write the dummy pKICertificateTemplate
+        template_dn = 'CN=Machine,CN=Certificate Templates,%s' % confdn
+        ldb.add({'dn': template_dn,
+                 'objectClass': 'pKICertificateTemplate',
+                })
+
+        with TemporaryDirectory() as dname:
+            ext.process_group_policy([], gpos, dname, dname)
+            ca_crt = os.path.join(dname, '%s.crt' % ca_cn)
+            self.assertTrue(os.path.exists(ca_crt),
+                            'Root CA certificate was not requested')
+            machine_crt = os.path.join(dname, '%s.Machine.crt' % ca_cn)
+            self.assertTrue(os.path.exists(machine_crt),
+                            'Machine certificate was not requested')
+            machine_key = os.path.join(dname, '%s.Machine.key' % ca_cn)
+            self.assertTrue(os.path.exists(machine_crt),
+                            'Machine key was not generated')
+
+            # Verify RSOP does not fail
+            ext.rsop([g for g in gpos if g.name == guid][0])
+
+            # Remove policy
+            gp_db = store.get_gplog(machine_creds.get_username())
+            del_gpos = get_deleted_gpos_list(gp_db, [])
+            ext.process_group_policy(del_gpos, [], dname)
+            self.assertFalse(os.path.exists(ca_crt),
+                            'Root CA certificate was not removed')
+            self.assertFalse(os.path.exists(machine_crt),
+                            'Machine certificate was not removed')
+            self.assertFalse(os.path.exists(machine_crt),
+                            'Machine key was not removed')
+            out, _ = Popen(['getcert', 'list-cas'], stdout=PIPE).communicate()
+            self.assertNotIn(get_bytes(ca_cn), out, 'CA was not removed')
+            out, _ = Popen(['getcert', 'list'], stdout=PIPE).communicate()
+            self.assertNotIn(b'Machine', out,
+                             'Machine certificate not removed')
+
+        # Remove the dummy CA, pKIEnrollmentService, and pKICertificateTemplate
+        ldb.delete(certa_dn)
+        ldb.delete(enroll_dn)
+        ldb.delete(template_dn)
+
+        # Unstage the Registry.pol file
+        unstage_file(reg_pol)
index 27497e069d1c27fbf030ce799dfe91317aa0f8d0..2cc9e520d295f7c21625d6daf60c1fbad877bbfd 100644 (file)
@@ -119,6 +119,7 @@ EXCLUDE_DIRS = {
     'bin/python/samba/tests',
     'bin/python/samba/tests/dcerpc',
     'bin/python/samba/tests/krb5',
+    'python/samba/tests/bin',
 }
 
 
index 8938754d0fcf5558ba2899502c72aac989647127..8ca8649bda392cf50568f61b831d1036b935a9be 100755 (executable)
@@ -846,7 +846,9 @@ planpythontestsuite("chgdcpass:local", "samba.tests.samba_tool.dnscmd")
 planpythontestsuite("chgdcpass:local", "samba.tests.dcerpc.rpcecho")
 
 planoldpythontestsuite("nt4_dc", "samba.tests.netbios", extra_args=['-U"$USERNAME%$PASSWORD"'])
-planoldpythontestsuite("ad_dc:local", "samba.tests.gpo", extra_args=['-U"$USERNAME%$PASSWORD"'])
+test_bin = os.path.abspath(os.path.join(os.getenv('BINDIR', './bin'), '../python/samba/tests/bin'))
+planoldpythontestsuite("ad_dc:local", "samba.tests.gpo", extra_args=['-U"$USERNAME%$PASSWORD"'],
+                       environ={'PATH':':'.join([test_bin, os.getenv('PATH', '')])})
 planoldpythontestsuite("ad_member", "samba.tests.gpo_member", extra_args=['-U"$USERNAME%$PASSWORD"'])
 planoldpythontestsuite("ad_dc:local", "samba.tests.dckeytab", extra_args=['-U"$USERNAME%$PASSWORD"'])