import os
import operator
+import requests
from samba.gpclass import gp_pol_ext
from samba import Ldb
from ldb import SCOPE_SUBTREE, SCOPE_BASE
from shutil import which
from subprocess import Popen, PIPE
import re
-from glob import glob
import json
from samba.gp.util.logging import log
import struct
+try:
+ from cryptography.hazmat.primitives.serialization.pkcs7 import \
+ load_der_pkcs7_certificates
+except ModuleNotFoundError:
+ def load_der_pkcs7_certificates(x): return []
+ log.error('python cryptography missing pkcs7 support. '
+ 'Certificate chain parsing will fail')
+from cryptography.hazmat.primitives.serialization import Encoding
+from cryptography.x509 import load_der_x509_certificate
+from cryptography.hazmat.backends import default_backend
cert_wrap = b"""
-----BEGIN CERTIFICATE-----
return out.strip().split()
return []
+
+def getca(ca_name, url, trust_dir):
+ """Fetch Certificate Chain from the CA."""
+ root_cert = os.path.join(trust_dir, '%s.crt' % ca_name)
+ root_certs = []
+
+ try:
+ r = requests.get(url=url, params={'operation': 'GetCACert',
+ 'message': 'CAIdentifier'})
+ except requests.exceptions.ConnectionError:
+ log.warn('Failed to establish a new connection')
+ r = None
+ if r is None or r.content == b'':
+ log.warn('Failed to fetch the root certificate chain.')
+ log.warn('Ensure you have installed and configured the'
+ ' Network Device Enrollment Service.')
+ return root_certs
+
+ if r.headers['Content-Type'] == 'application/x-x509-ca-cert':
+ # Older versions of load_der_x509_certificate require a backend param
+ try:
+ cert = load_der_x509_certificate(r.content)
+ except TypeError:
+ cert = load_der_x509_certificate(r.content, default_backend())
+ cert_data = cert.public_bytes(Encoding.PEM)
+ with open(root_cert, 'wb') as w:
+ w.write(cert_data)
+ root_certs.append(root_cert)
+ elif r.headers['Content-Type'] == 'application/x-x509-ca-ra-cert':
+ certs = load_der_pkcs7_certificates(r.content)
+ for i in range(0, len(certs)):
+ cert = certs[i].public_bytes(Encoding.PEM)
+ dest = '%s.%d' % (root_cert, i)
+ with open(dest, 'wb') as w:
+ w.write(cert)
+ root_certs.append(dest)
+ else:
+ log.warn('getca: Wrong (or missing) MIME content type')
+
+ return root_certs
+
+
def cert_enroll(ca, ldb, trust_dir, private_dir, auth='Kerberos'):
# Install the root certificate chain
data = {'files': [], 'templates': []}
- sscep = which('sscep')
- if sscep is not None:
- url = 'http://%s/CertSrv/mscep/mscep.dll/pkiclient.exe?' % \
- ca['hostname']
- root_cert = os.path.join(trust_dir, '%s.crt' % ca['name'])
- ret = Popen([sscep, 'getca', '-F', 'sha1', '-c',
- root_cert, '-u', url]).wait()
- if ret != 0:
- log.warn('sscep failed to fetch the root certificate chain.')
- log.warn('Ensure you have installed and configured the' +
- ' Network Device Enrollment Service.')
- root_certs = glob('%s*' % root_cert)
- data['files'].extend(root_certs)
- for src in root_certs:
- # Symlink the certs to global trust dir
- dst = os.path.join(global_trust_dir, os.path.basename(src))
- try:
- os.symlink(src, dst)
- data['files'].append(dst)
- except PermissionError:
- log.warn('Failed to symlink root certificate to the' +
- ' admin trust anchors')
- except FileNotFoundError:
- log.warn('Failed to symlink root certificate to the' +
- ' admin trust anchors.' +
- ' The directory was not found', global_trust_dir)
- except FileExistsError:
- # If we're simply downloading a renewed cert, the symlink
- # already exists. Ignore the FileExistsError. Preserve the
- # existing symlink in the unapply data.
- data['files'].append(dst)
- else:
- log.warn('sscep is not installed, which prevents the installation' +
- ' of the root certificate chain.')
+ url = 'http://%s/CertSrv/mscep/mscep.dll/pkiclient.exe?' % ca['hostname']
+ root_certs = getca(ca['name'], url, trust_dir)
+ data['files'].extend(root_certs)
+ for src in root_certs:
+ # Symlink the certs to global trust dir
+ dst = os.path.join(global_trust_dir, os.path.basename(src))
+ try:
+ os.symlink(src, dst)
+ data['files'].append(dst)
+ except PermissionError:
+ log.warn('Failed to symlink root certificate to the'
+ ' admin trust anchors')
+ except FileNotFoundError:
+ log.warn('Failed to symlink root certificate to the'
+ ' admin trust anchors.'
+ ' The directory was not found', global_trust_dir)
+ except FileExistsError:
+ # If we're simply downloading a renewed cert, the symlink
+ # already exists. Ignore the FileExistsError. Preserve the
+ # existing symlink in the unapply data.
+ data['files'].append(dst)
update = which('update-ca-certificates')
if update is not None:
Popen([update]).wait()
from samba.vgp_issue_ext import vgp_issue_ext
from samba.vgp_access_ext import vgp_access_ext
from samba.gp_gnome_settings_ext import gp_gnome_settings_ext
-from samba.gp_cert_auto_enroll_ext import gp_cert_auto_enroll_ext, \
- octet_string_to_objectGUID
+from samba import gp_cert_auto_enroll_ext as cae
from samba.gp_firefox_ext import gp_firefox_ext
from samba.gp_chromium_ext import gp_chromium_ext
from samba.gp_firewalld_ext import gp_firewalld_ext
from samba.auth import system_session
import json
from shutil import which
+import requests
+from cryptography import x509
+from cryptography.hazmat.primitives import hashes
+from cryptography.hazmat.backends import default_backend
+from cryptography.hazmat.primitives.asymmetric import rsa
+from cryptography.hazmat.primitives.serialization import Encoding
+from datetime import datetime, timedelta
+
+def dummy_certificate():
+ name = x509.Name([
+ x509.NameAttribute(x509.NameOID.COMMON_NAME,
+ os.environ.get('SERVER'))
+ ])
+ cons = x509.BasicConstraints(ca=True, path_length=0)
+ now = datetime.utcnow()
+
+ key = rsa.generate_private_key(public_exponent=65537, key_size=2048,
+ backend=default_backend())
+
+ cert = (
+ x509.CertificateBuilder()
+ .subject_name(name)
+ .issuer_name(name)
+ .public_key(key.public_key())
+ .serial_number(1000)
+ .not_valid_before(now)
+ .not_valid_after(now + timedelta(seconds=300))
+ .add_extension(cons, False)
+ .sign(key, hashes.SHA256(), default_backend())
+ )
+
+ return cert.public_bytes(encoding=Encoding.DER)
+
+# Dummy requests structure for Certificate Auto Enrollment
+class dummy_requests(object):
+ @staticmethod
+ def get(url=None, params=None):
+ dummy = requests.Response()
+ dummy._content = dummy_certificate()
+ dummy.headers = {'Content-Type': 'application/x-x509-ca-cert'}
+ return dummy
+
+ class exceptions(object):
+ ConnectionError = Exception
+cae.requests = dummy_requests
realm = os.environ.get('REALM')
policies = realm + '/POLICIES'
machine_creds.set_machine_account()
# Initialize the group policy extension
- ext = gp_cert_auto_enroll_ext(self.lp, machine_creds,
- machine_creds.get_username(), store)
+ ext = cae.gp_cert_auto_enroll_ext(self.lp, machine_creds,
+ machine_creds.get_username(), store)
ads = gpo.ADS_STRUCT(self.server, self.lp, machine_creds)
if ads.connect():
machine_creds.set_machine_account()
# Initialize the group policy extension
- ext = gp_cert_auto_enroll_ext(self.lp, machine_creds,
- machine_creds.get_username(), store)
+ ext = cae.gp_cert_auto_enroll_ext(self.lp, machine_creds,
+ machine_creds.get_username(), store)
ads = gpo.ADS_STRUCT(self.server, self.lp, machine_creds)
if ads.connect():
_ldb.SCOPE_BASE, '(objectClass=*)', ['objectGUID'])
self.assertTrue(len(res2) == 1, 'objectGUID not found')
objectGUID = b'{%s}' % \
- octet_string_to_objectGUID(res2[0]['objectGUID'][0]).upper().encode()
+ cae.octet_string_to_objectGUID(res2[0]['objectGUID'][0]).upper().encode()
parser = GPPolParser()
parser.load_xml(etree.fromstring(advanced_enroll_reg_pol.strip() % \
(objectGUID, objectGUID, objectGUID, objectGUID)))