test_dnsdist(){
run "cd regression-tests.dnsdist"
- run "DNSDISTBIN=$HOME/dnsdist/bin/dnsdist ./runtests -v --ignore-files='(?:^\.|^_,|^setup\.py$|^test_DOH\.py$)'"
+ run "DNSDISTBIN=$HOME/dnsdist/bin/dnsdist ./runtests -v --ignore-files='(?:^\.|^_,|^setup\.py$|^test_DOH\.py$|^test_OCSP\.py$)'"
run "rm -f ./DNSCryptResolver.cert ./DNSCryptResolver.key"
run "cd .."
}
#include "protobuf.hh"
#include "sodcrypto.hh"
+#ifdef HAVE_LIBSSL
+#include "libssl.hh"
+#endif
+
#include <boost/logic/tribool.hpp>
#include <boost/lexical_cast.hpp>
});
g_lua.writeFunction("setAllowEmptyResponse", [](bool allow) { g_allowEmptyResponse=allow; });
+
+#if defined(HAVE_LIBSSL) && defined(HAVE_OCSP_BASIC_SIGN)
+ g_lua.writeFunction("generateOCSPResponse", [](const std::string& certFile, const std::string& caCert, const std::string& caKey, const std::string& outFile, int ndays, int nmin) {
+ return libssl_generate_ocsp_response(certFile, caCert, caKey, outFile, ndays, nmin);
+ });
+#endif /* HAVE_LIBSSL && HAVE_OCSP_BASIC_SIGN*/
}
vector<std::function<void(void)>> setupLua(bool client, const std::string& config)
return EVP_PKEY_base_id(pkey);
}
+#ifdef HAVE_OCSP_BASIC_SIGN
+bool libssl_generate_ocsp_response(const std::string& certFile, const std::string& caCert, const std::string& caKey, const std::string& outFile, int ndays, int nmin)
+{
+ const EVP_MD* rmd = EVP_sha256();
+
+ auto fp = std::unique_ptr<FILE, int(*)(FILE*)>(fopen(certFile.c_str(), "r"), fclose);
+ if (!fp) {
+ throw std::runtime_error("Unable to open '" + certFile + "' when loading the certificate to generate an OCSP response");
+ }
+ auto cert = std::unique_ptr<X509, void(*)(X509*)>(PEM_read_X509_AUX(fp.get(), nullptr, nullptr, nullptr), X509_free);
+
+ fp = std::unique_ptr<FILE, int(*)(FILE*)>(fopen(caCert.c_str(), "r"), fclose);
+ if (!fp) {
+ throw std::runtime_error("Unable to open '" + caCert + "' when loading the issuer certificate to generate an OCSP response");
+ }
+ auto issuer = std::unique_ptr<X509, void(*)(X509*)>(PEM_read_X509_AUX(fp.get(), nullptr, nullptr, nullptr), X509_free);
+ fp = std::unique_ptr<FILE, int(*)(FILE*)>(fopen(caKey.c_str(), "r"), fclose);
+ if (!fp) {
+ throw std::runtime_error("Unable to open '" + caKey + "' when loading the issuer key to generate an OCSP response");
+ }
+ auto issuerKey = std::unique_ptr<EVP_PKEY, void(*)(EVP_PKEY*)>(PEM_read_PrivateKey(fp.get(), nullptr, nullptr, nullptr), EVP_PKEY_free);
+ fp.reset();
+
+ auto bs = std::unique_ptr<OCSP_BASICRESP, void(*)(OCSP_BASICRESP*)>(OCSP_BASICRESP_new(), OCSP_BASICRESP_free);
+ auto thisupd = std::unique_ptr<ASN1_TIME, void(*)(ASN1_TIME*)>(X509_gmtime_adj(nullptr, 0), ASN1_TIME_free);
+ auto nextupd = std::unique_ptr<ASN1_TIME, void(*)(ASN1_TIME*)>(X509_time_adj_ex(nullptr, ndays, nmin * 60, nullptr), ASN1_TIME_free);
+
+ auto cid = std::unique_ptr<OCSP_CERTID, void(*)(OCSP_CERTID*)>(OCSP_cert_to_id(rmd, cert.get(), issuer.get()), OCSP_CERTID_free);
+ OCSP_basic_add1_status(bs.get(), cid.get(), V_OCSP_CERTSTATUS_GOOD, 0, nullptr, thisupd.get(), nextupd.get());
+
+ if (OCSP_basic_sign(bs.get(), issuer.get(), issuerKey.get(), rmd, nullptr, OCSP_NOCERTS) != 1) {
+ throw std::runtime_error("Error while signing the OCSP response");
+ }
+
+ auto resp = std::unique_ptr<OCSP_RESPONSE, void(*)(OCSP_RESPONSE*)>(OCSP_response_create(OCSP_RESPONSE_STATUS_SUCCESSFUL, bs.get()), OCSP_RESPONSE_free);
+ auto bio = std::unique_ptr<BIO, void(*)(BIO*)>(BIO_new_file(outFile.c_str(), "wb"), BIO_vfree);
+ if (!bio) {
+ throw std::runtime_error("Error opening file for writing the OCSP response");
+ }
+
+ // i2d_OCSP_RESPONSE_bio(bio.get(), resp.get()) is unusable from C++ because of an invalid cast
+ ASN1_i2d_bio((i2d_of_void*)i2d_OCSP_RESPONSE, bio.get(), (unsigned char*)resp.get());
+
+ return true;
+}
+#endif /* HAVE_OCSP_BASIC_SIGN */
+
#endif /* HAVE_LIBSSL */
std::map<int, std::string> libssl_load_ocsp_responses(const std::vector<std::string>& ocspFiles, std::vector<int> keyTypes);
int libssl_get_last_key_type(std::unique_ptr<SSL_CTX, void(*)(SSL_CTX*)>& ctx);
+#ifdef HAVE_OCSP_BASIC_SIGN
+bool libssl_generate_ocsp_response(const std::string& certFile, const std::string& caCert, const std::string& caKey, const std::string& outFile, int ndays, int nmin);
+#endif
+
#endif /* HAVE_LIBSSL */
save_CFLAGS=$CFLAGS
save_LIBS=$LIBS
CFLAGS="$LIBSSL_CFLAGS $CFLAGS"
- LIBS="$LIBSSL_LIBS $LIBS"
- AC_CHECK_FUNCS([SSL_CTX_set_ciphersuites])
+ LIBS="$LIBSSL_LIBS -lcrypto $LIBS"
+ AC_CHECK_FUNCS([SSL_CTX_set_ciphersuites OCSP_basic_sign])
CFLAGS=$save_CFLAGS
LIBS=$save_LIBS
/server.csr
/server.key
/server.pem
+/server.ocsp
/configs
\ No newline at end of file
set -x
fi
-rm -f ca.key ca.pem ca.srl server.csr server.key server.pem server.chain
+rm -f ca.key ca.pem ca.srl server.csr server.key server.pem server.chain server.ocsp
rm -rf configs/*
# Generate a new CA
false
fi
-rm ca.key ca.pem ca.srl server.csr server.key server.pem server.chain
+rm -f ca.key ca.pem ca.srl server.csr server.key server.pem server.chain server.ocsp
--- /dev/null
+#!/usr/bin/env python
+import dns
+import subprocess
+from dnsdisttests import DNSDistTest
+
+class DNSDistOCSPStaplingTest(DNSDistTest):
+
+ @classmethod
+ def checkOCSPStaplingStatus(cls, addr, port, serverName, caFile):
+ testcmd = ['openssl', 's_client', '-CAfile', caFile, '-connect', '%s:%d' % (addr, port), '-status', '-servername', serverName ]
+ output = None
+ try:
+ process = subprocess.Popen(testcmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True)
+ output = process.communicate(input='')
+ except subprocess.CalledProcessError as exc:
+ raise AssertionError('dnsdist --check-config failed (%d): %s' % (exc.returncode, exc.output))
+
+ return output[0].decode()
+
+class TestOCSPStaplingDOH(DNSDistOCSPStaplingTest):
+
+ _serverKey = 'server.key'
+ _serverCert = 'server.chain'
+ _serverName = 'tls.tests.dnsdist.org'
+ _ocspFile = 'server.ocsp'
+ _caCert = 'ca.pem'
+ _caKey = 'ca.key'
+ _dohServerPort = 8443
+ _config_template = """
+ newServer{address="127.0.0.1:%s"}
+
+ -- generate an OCSP response file for our certificate, valid one day
+ generateOCSPResponse('%s', '%s', '%s', '%s', 1, 0)
+ addDOHLocal("127.0.0.1:%s", "%s", "%s", { "/" }, { ocspResponses={"%s"}})
+ """
+ _config_params = ['_testServerPort', '_serverCert', '_caCert', '_caKey', '_ocspFile', '_dohServerPort', '_serverCert', '_serverKey', '_ocspFile']
+
+ def testOCSPStapling(self):
+ """
+ OCSP Stapling: DOH
+ """
+ output = self.checkOCSPStaplingStatus('127.0.0.1', self._dohServerPort, self._serverName, self._caCert)
+ self.assertIn('OCSP Response Status: successful (0x0)', output)
+
+class TestOCSPStaplingTLSGnuTLS(DNSDistOCSPStaplingTest):
+
+ _serverKey = 'server.key'
+ _serverCert = 'server.chain'
+ _serverName = 'tls.tests.dnsdist.org'
+ _ocspFile = 'server.ocsp'
+ _caCert = 'ca.pem'
+ _caKey = 'ca.key'
+ _tlsServerPort = 8443
+ _config_template = """
+ newServer{address="127.0.0.1:%s"}
+
+ -- generate an OCSP response file for our certificate, valid one day
+ generateOCSPResponse('%s', '%s', '%s', '%s', 1, 0)
+ addTLSLocal("127.0.0.1:%s", "%s", "%s", { provider="gnutls", ocspResponses={"%s"}})
+ """
+ _config_params = ['_testServerPort', '_serverCert', '_caCert', '_caKey', '_ocspFile', '_tlsServerPort', '_serverCert', '_serverKey', '_ocspFile']
+
+ def testOCSPStapling(self):
+ """
+ OCSP Stapling: TLS (GnuTLS)
+ """
+ output = self.checkOCSPStaplingStatus('127.0.0.1', self._tlsServerPort, self._serverName, self._caCert)
+ self.assertIn('OCSP Response Status: successful (0x0)', output)
+
+class TestOCSPStaplingTLSOpenSSL(DNSDistOCSPStaplingTest):
+
+ _serverKey = 'server.key'
+ _serverCert = 'server.chain'
+ _serverName = 'tls.tests.dnsdist.org'
+ _ocspFile = 'server.ocsp'
+ _caCert = 'ca.pem'
+ _caKey = 'ca.key'
+ _tlsServerPort = 8443
+ _config_template = """
+ newServer{address="127.0.0.1:%s"}
+
+ -- generate an OCSP response file for our certificate, valid one day
+ generateOCSPResponse('%s', '%s', '%s', '%s', 1, 0)
+ addTLSLocal("127.0.0.1:%s", "%s", "%s", { provider="openssl", ocspResponses={"%s"}})
+ """
+ _config_params = ['_testServerPort', '_serverCert', '_caCert', '_caKey', '_ocspFile', '_tlsServerPort', '_serverCert', '_serverKey', '_ocspFile']
+
+ def testOCSPStapling(self):
+ """
+ OCSP Stapling: TLS (OpenSSL)
+ """
+ output = self.checkOCSPStaplingStatus('127.0.0.1', self._tlsServerPort, self._serverName, self._caCert)
+ self.assertIn('OCSP Response Status: successful (0x0)', output)