import os
import operator
-import requests
from samba.gp.gpclass import gp_pol_ext, gp_applier, GPOSTATE
from samba import Ldb
from samba.dcerpc import misc
return out.strip().split()
-def getca(ca, url, trust_dir):
- """Fetch Certificate Chain from the CA."""
+def getca(ca, trust_dir):
+ """Fetch a certificate from LDAP."""
root_cert = os.path.join(trust_dir, '%s.crt' % ca['name'])
root_certs = []
-
- try:
- r = requests.get(url=url, params={'operation': 'GetCACert',
- 'message': 'CAIdentifier'})
- except requests.exceptions.ConnectionError:
- log.warn('Could not connect to Network Device Enrollment Service.')
- r = None
- if r is None or r.content == b'' or r.headers['Content-Type'] == 'text/html':
- log.warn('Unable to fetch root certificates (requires NDES).')
- if 'cACertificate' in ca:
- log.warn('Installing the server certificate only.')
- der_certificate = base64.b64decode(ca['cACertificate'])
- try:
- cert = load_der_x509_certificate(der_certificate)
- except TypeError:
- cert = load_der_x509_certificate(der_certificate,
- default_backend())
- cert_data = cert.public_bytes(Encoding.PEM)
- with open(root_cert, 'wb') as w:
- w.write(cert_data)
- root_certs.append(root_cert)
- return root_certs
-
- if r.headers['Content-Type'] == 'application/x-x509-ca-cert':
- # Older versions of load_der_x509_certificate require a backend param
+ if 'cACertificate' in ca:
+ log.warn('Installing the server certificate only.')
+ der_certificate = base64.b64decode(ca['cACertificate'])
try:
- cert = load_der_x509_certificate(r.content)
+ cert = load_der_x509_certificate(der_certificate)
except TypeError:
- cert = load_der_x509_certificate(r.content, default_backend())
+ cert = load_der_x509_certificate(der_certificate,
+ default_backend())
cert_data = cert.public_bytes(Encoding.PEM)
with open(root_cert, 'wb') as w:
w.write(cert_data)
root_certs.append(root_cert)
- elif r.headers['Content-Type'] == 'application/x-x509-ca-ra-cert':
- certs = load_der_pkcs7_certificates(r.content)
- for i in range(0, len(certs)):
- cert = certs[i].public_bytes(Encoding.PEM)
- filename, extension = root_cert.rsplit('.', 1)
- dest = '%s.%d.%s' % (filename, i, extension)
- with open(dest, 'wb') as w:
- w.write(cert)
- root_certs.append(dest)
- else:
- log.warn('getca: Wrong (or missing) MIME content type')
-
return root_certs
-
def find_global_trust_dir():
"""Return the global trust dir using known paths from various Linux distros."""
for trust_dir in global_trust_dirs:
def cert_enroll(ca, ldb, trust_dir, private_dir, auth='Kerberos'):
"""Install the root certificate chain."""
data = dict({'files': [], 'templates': []}, **ca)
- url = 'http://%s/CertSrv/mscep/mscep.dll/pkiclient.exe?' % ca['hostname']
log.info("Try to get root or server certificates")
- root_certs = getca(ca, url, trust_dir)
+ root_certs = getca(ca, trust_dir)
data['files'].extend(root_certs)
global_trust_dir = find_global_trust_dir()
for src in root_certs: