import struct
from typing import Any, Optional, cast
import dns
+import dns.message
import async_timeout
from aioquic.quic.configuration import QuicConfiguration
try:
async with async_timeout.timeout(timeout):
answer = await client.query(query)
- return answer
+ return (answer, client._quic.tls._peer_certificate.serial_number)
except asyncio.TimeoutError as e:
- return e
+ return (e, None)
class StreamResetError(Exception):
def __init__(self, error, message="Stream reset by peer"):
configuration = QuicConfiguration(alpn_protocols=["doq"], is_client=True)
if verify:
configuration.load_verify_locations(verify)
- result = asyncio.run(
+ (result, serial) = asyncio.run(
async_quic_query(
configuration=configuration,
host=host,
raise StreamResetError(result.error_code)
if (isinstance(result, asyncio.TimeoutError)):
raise TimeoutError()
- return result
+ return (result, serial)
def quic_bogus_query(query, host='127.0.0.1', timeout=2, port=853, verify=None, server_hostname=None):
configuration = QuicConfiguration(alpn_protocols=["doq"], is_client=True)
if verify:
configuration.load_verify_locations(verify)
- result = asyncio.run(
+ (result, _) = asyncio.run(
async_quic_query(
configuration=configuration,
host=host,
#!/usr/bin/env python
+import base64
import dns
import clientsubnetoption
from doqclient import quic_bogus_query
from quictests import QUICTests, QUICWithCacheTests, QUICACLTests
import doqclient
+from doqclient import quic_query
class TestDOQBogus(DNSDistTest):
_serverKey = 'server.key'
addDOQLocal("127.0.0.1:%d", "%s", "%s")
"""
_config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey']
- _verboseMode = True
def testDOQBogus(self):
"""
addDOQLocal("127.0.0.1:%d", "%s", "%s")
"""
_config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey']
- _verboseMode = True
def getQUICConnection(self):
return self.getDOQConnection(self._doqServerPort, self._caCert)
getPool(""):setCache(pc)
"""
_config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey']
- _verboseMode = True
def getQUICConnection(self):
return self.getDOQConnection(self._doqServerPort, self._caCert)
addDOQLocal("127.0.0.1:%d", "%s", "%s")
"""
_config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey']
- _verboseMode = True
def getQUICConnection(self):
return self.getDOQConnection(self._doqServerPort, self._caCert)
def sendQUICQuery(self, query, response=None, useQueue=True, connection=None):
return self.sendDOQQuery(self._doqServerPort, query, response=response, caFile=self._caCert, useQueue=useQueue, serverName=self._serverName, connection=connection)
+
+class TestDOQCertificateReloading(DNSDistTest):
+ _consoleKey = DNSDistTest.generateConsoleKey()
+ _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
+ _serverKey = 'server-doq.key'
+ _serverCert = 'server-doq.chain'
+ _serverName = 'tls.tests.dnsdist.org'
+ _caCert = 'ca.pem'
+ _doqServerPort = pickAvailablePort()
+ _config_template = """
+ setKey("%s")
+ controlSocket("127.0.0.1:%s")
+
+ newServer{address="127.0.0.1:%d"}
+
+ addDOQLocal("127.0.0.1:%d", "%s", "%s")
+ """
+ _config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort', '_doqServerPort','_serverCert', '_serverKey']
+
+ @classmethod
+ def setUpClass(cls):
+ cls.generateNewCertificateAndKey('server-doq')
+ cls.startResponders()
+ cls.startDNSDist()
+ cls.setUpSockets()
+
+ def testCertificateReloaded(self):
+ name = 'certificate-reload.doq.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
+ query.id = 0
+ (_, serial) = quic_query(query, '127.0.0.1', 0.5, self._doqServerPort, verify=self._caCert, server_hostname=self._serverName)
+
+ self.generateNewCertificateAndKey('server-doq')
+ self.sendConsoleCommand("reloadAllCertificates()")
+
+ (_, secondSerial) = quic_query(query, '127.0.0.1', 0.5, self._doqServerPort, verify=self._caCert, server_hostname=self._serverName)
+ # check that the serial is different
+ self.assertNotEqual(serial, secondSerial)