with open(inFileName) as inFile:
outFile.write(inFile.read())
+ cmd = ['openssl', 'pkcs12', '-export', '-passout', 'pass:passw0rd', '-clcerts', '-in', 'server.pem', '-CAfile', 'ca.pem', '-inkey', 'server.key', '-out', 'server.p12']
+ output = None
+ try:
+ process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True)
+ output = process.communicate(input='')
+ except subprocess.CalledProcessError as exc:
+ raise AssertionError('openssl pkcs12 failed (%d): %s' % (exc.returncode, exc.output))
+
def checkMessageProxyProtocol(self, receivedProxyPayload, source, destination, isTCP, values=[], v6=False, sourcePort=None, destinationPort=None):
proxy = ProxyProtocol()
self.assertTrue(proxy.parseHeader(receivedProxyPayload))
openssl x509 -req -days 1 -CA ca.pem -CAkey ca.key -CAcreateserial -in server.csr -out server.pem -extfile configServer.conf -extensions v3_req
# Generate a chain
cat server.pem ca.pem > server.chain
+# Generate a password-protected PKCS12 file
+openssl pkcs12 -export -passout pass:passw0rd -clcerts -in server.pem -CAfile ca.pem -inkey server.key -out server.p12
out=$(mktemp)
set -o pipefail
self.assertEqual(expectedQuery, receivedQuery)
self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
self.assertEqual(response, receivedResponse)
+
+class TestDOHWithPCKS12Cert(DNSDistDOHTest):
+ _serverCert = 'server.p12'
+ _pkcs12Password = 'passw0rd'
+ _serverName = 'tls.tests.dnsdist.org'
+ _caCert = 'ca.pem'
+ _dohServerPort = 8443
+ _dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
+ _config_template = """
+ newServer{address="127.0.0.1:%s"}
+ cert=newTLSCertificate("%s", {password="%s"})
+ addDOHLocal("127.0.0.1:%s", cert, "", { "/" })
+ """
+ _config_params = ['_testServerPort', '_serverCert', '_pkcs12Password', '_dohServerPort']
+
+ def testProtocolDOH(self):
+ """
+ DoH: Test Simple DOH Query with a password protected PCKS12 file configured
+ """
+ name = 'simple.doh.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
+ query.id = 0
+ expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
+ expectedQuery.id = 0
+ response = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 3600,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '127.0.0.1')
+ response.answer.append(rrset)
+
+ (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = expectedQuery.id
+ self.assertEqual(expectedQuery, receivedQuery)
receivedQuery.id = query.id
self.assertEqual(query, receivedQuery)
self.assertEqual(response, receivedResponse)
+
+class TestPKCSTLSCertificate(DNSDistTest, TLSTests):
+ _consoleKey = DNSDistTest.generateConsoleKey()
+ _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
+ _serverCert = 'server.p12'
+ _pkcsPassphrase = 'passw0rd'
+ _serverName = 'tls.tests.dnsdist.org'
+ _caCert = 'ca.pem'
+ _tlsServerPort = 8453
+ _config_template = """
+ setKey("%s")
+ controlSocket("127.0.0.1:%s")
+ newServer{address="127.0.0.1:%s"}
+ cert=newTLSCertificate("%s", {password="%s"})
+ addTLSLocal("127.0.0.1:%s", cert, "", { provider="openssl" })
+ addAction(SNIRule("powerdns.com"), SpoofAction("1.2.3.4"))
+ """
+ _config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort', '_serverCert', '_pkcsPassphrase', '_tlsServerPort']