#!/usr/bin/env python
import base64
+import socket
import time
import dns
import dns.message
# generate a new certificate
self.sendConsoleCommand("generateDNSCryptCertificate('DNSCryptProviderPrivate.key', 'DNSCryptResolver.cert.2', 'DNSCryptResolver.key.2', {!s}, {:.0f}, {:.0f})".format(self._resolverCertificateSerial + 1, self._resolverCertificateValidFrom, self._resolverCertificateValidUntil))
- # switch to that new certificate
+ # add that new certificate
self.sendConsoleCommand("getDNSCryptBind(0):loadNewCertificate('DNSCryptResolver.cert.2', 'DNSCryptResolver.key.2')")
- oldSerial = self.sendConsoleCommand("getDNSCryptBind(0):getOldCertificate():getSerial()")
+ oldSerial = self.sendConsoleCommand("getDNSCryptBind(0):getCertificate(0):getSerial()")
self.assertEquals(int(oldSerial), self._resolverCertificateSerial)
- effectiveSerial = self.sendConsoleCommand("getDNSCryptBind(0):getCurrentCertificate():getSerial()")
+ effectiveSerial = self.sendConsoleCommand("getDNSCryptBind(0):getCertificate(1):getSerial()")
self.assertEquals(int(effectiveSerial), self._resolverCertificateSerial + 1)
- tsStart = self.sendConsoleCommand("getDNSCryptBind(0):getCurrentCertificate():getTSStart()")
+ tsStart = self.sendConsoleCommand("getDNSCryptBind(0):getCertificate(1):getTSStart()")
self.assertEquals(int(tsStart), self._resolverCertificateValidFrom)
- tsEnd = self.sendConsoleCommand("getDNSCryptBind(0):getCurrentCertificate():getTSEnd()")
+ tsEnd = self.sendConsoleCommand("getDNSCryptBind(0):getCertificate(1):getTSEnd()")
self.assertEquals(int(tsEnd), self._resolverCertificateValidUntil)
# we should still be able to send queries with the previous certificate
cert = client.getResolverCertificate()
self.assertTrue(cert)
self.assertEquals(cert.serial, self._resolverCertificateSerial + 1)
+ # we should still get the old ones
+ certs = client.getAllResolverCertificates(True)
+ self.assertEquals(len(certs), 2)
+ self.assertEquals(certs[0].serial, self._resolverCertificateSerial)
+ self.assertEquals(certs[1].serial, self._resolverCertificateSerial + 1)
# generate a third certificate, this time in memory
self.sendConsoleCommand("getDNSCryptBind(0):generateAndLoadInMemoryCertificate('DNSCryptProviderPrivate.key', {!s}, {:.0f}, {:.0f})".format(self._resolverCertificateSerial + 2, self._resolverCertificateValidFrom, self._resolverCertificateValidUntil))
cert = client.getResolverCertificate()
self.assertTrue(cert)
self.assertEquals(cert.serial, self._resolverCertificateSerial + 2)
+ # we should still get the old ones
+ certs = client.getAllResolverCertificates(True)
+ self.assertEquals(len(certs), 3)
+ self.assertEquals(certs[0].serial, self._resolverCertificateSerial)
+ self.assertEquals(certs[1].serial, self._resolverCertificateSerial + 1)
+ self.assertEquals(certs[2].serial, self._resolverCertificateSerial + 2)
+
+ # generate a fourth certificate, still in memory
+ self.sendConsoleCommand("getDNSCryptBind(0):generateAndLoadInMemoryCertificate('DNSCryptProviderPrivate.key', {!s}, {:.0f}, {:.0f})".format(self._resolverCertificateSerial + 3, self._resolverCertificateValidFrom, self._resolverCertificateValidUntil))
+
+ # mark the old ones as inactive
+ self.sendConsoleCommand("getDNSCryptBind(0):markInactive({!s})".format(self._resolverCertificateSerial))
+ self.sendConsoleCommand("getDNSCryptBind(0):markInactive({!s})".format(self._resolverCertificateSerial + 1))
+ self.sendConsoleCommand("getDNSCryptBind(0):markInactive({!s})".format(self._resolverCertificateSerial + 2))
+ # we should still be able to send queries with the third one
+ self.doDNSCryptQuery(client, query, response, False)
+ self.doDNSCryptQuery(client, query, response, True)
+ cert = client.getResolverCertificate()
+ self.assertTrue(cert)
+ self.assertEquals(cert.serial, self._resolverCertificateSerial + 2)
+ # now remove them
+ self.sendConsoleCommand("getDNSCryptBind(0):removeInactiveCertificate({!s})".format(self._resolverCertificateSerial))
+ self.sendConsoleCommand("getDNSCryptBind(0):removeInactiveCertificate({!s})".format(self._resolverCertificateSerial + 1))
+ self.sendConsoleCommand("getDNSCryptBind(0):removeInactiveCertificate({!s})".format(self._resolverCertificateSerial + 2))
+
+ # we should not be able to send with the old ones anymore
+ try:
+ data = client.query(query.to_wire())
+ except socket.timeout:
+ data = None
+ self.assertEquals(data, None)
+
+ # refreshing should get us the fourth one
+ client.refreshResolverCertificates()
+ cert = client.getResolverCertificate()
+ self.assertTrue(cert)
+ self.assertEquals(cert.serial, self._resolverCertificateSerial + 3)
+ # and only that one
+ certs = client.getAllResolverCertificates(True)
+ self.assertEquals(len(certs), 1)
+ # and we should be able to query with it
+ self.doDNSCryptQuery(client, query, response, False)
+ self.doDNSCryptQuery(client, query, response, True)
+ cert = client.getResolverCertificate()
+ self.assertTrue(cert)
+ self.assertEquals(cert.serial, self._resolverCertificateSerial + 3)
class TestDNSCryptWithCache(DNSCryptTest):
_config_template = """
generateDNSCryptCertificate("DNSCryptProviderPrivate.key", "DNSCryptResolver.cert", "DNSCryptResolver.key", %d, %d, %d)
addDNSCryptBind("127.0.0.1:%d", "%s", "DNSCryptResolver.cert", "DNSCryptResolver.key")
- pc = newPacketCache(5, 86400, 1)
+ pc = newPacketCache(5, {maxTTL=86400, minTTL=1})
getPool(""):setCache(pc)
newServer{address="127.0.0.1:%s"}
"""