]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_DOQ.py
dnsdist: Add regression tests for the new cache-miss rules chain
[thirdparty/pdns.git] / regression-tests.dnsdist / test_DOQ.py
1 #!/usr/bin/env python
2 import base64
3 import dns
4 import clientsubnetoption
5
6 from dnsdisttests import DNSDistTest
7 from dnsdisttests import pickAvailablePort
8 from doqclient import quic_bogus_query
9 from quictests import QUICTests, QUICWithCacheTests, QUICACLTests
10 import doqclient
11 from doqclient import quic_query
12
13 class TestDOQBogus(DNSDistTest):
14 _serverKey = 'server.key'
15 _serverCert = 'server.chain'
16 _serverName = 'tls.tests.dnsdist.org'
17 _caCert = 'ca.pem'
18 _doqServerPort = pickAvailablePort()
19 _config_template = """
20 newServer{address="127.0.0.1:%d"}
21
22 addDOQLocal("127.0.0.1:%d", "%s", "%s")
23 """
24 _config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey']
25
26 def testDOQBogus(self):
27 """
28 DOQ: Test a bogus query (wrong packed length)
29 """
30 name = 'bogus.doq.tests.powerdns.com.'
31 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
32 query.id = 0
33 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
34 expectedQuery.id = 0
35
36 try:
37 message = quic_bogus_query(query, '127.0.0.1', 2.0, self._doqServerPort, verify=self._caCert, server_hostname=self._serverName)
38 self.assertFalse(True)
39 except doqclient.StreamResetError as e :
40 self.assertEqual(e.error, 2);
41
42 class TestDOQ(QUICTests, DNSDistTest):
43 _serverKey = 'server.key'
44 _serverCert = 'server.chain'
45 _serverName = 'tls.tests.dnsdist.org'
46 _caCert = 'ca.pem'
47 _doqServerPort = pickAvailablePort()
48 _config_template = """
49 newServer{address="127.0.0.1:%d"}
50
51 addAction("drop.doq.tests.powerdns.com.", DropAction())
52 addAction("refused.doq.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
53 addAction("spoof.doq.tests.powerdns.com.", SpoofAction("1.2.3.4"))
54 addAction("no-backend.doq.tests.powerdns.com.", PoolAction('this-pool-has-no-backend'))
55
56 addDOQLocal("127.0.0.1:%d", "%s", "%s")
57 """
58 _config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey']
59
60 def getQUICConnection(self):
61 return self.getDOQConnection(self._doqServerPort, self._caCert)
62
63 def sendQUICQuery(self, query, response=None, useQueue=True, connection=None):
64 return self.sendDOQQuery(self._doqServerPort, query, response=response, caFile=self._caCert, useQueue=useQueue, serverName=self._serverName, connection=connection)
65
66 class TestDOQWithCache(QUICWithCacheTests, DNSDistTest):
67 _serverKey = 'server.key'
68 _serverCert = 'server.chain'
69 _serverName = 'tls.tests.dnsdist.org'
70 _caCert = 'ca.pem'
71 _doqServerPort = pickAvailablePort()
72 _config_template = """
73 newServer{address="127.0.0.1:%d"}
74
75 addDOQLocal("127.0.0.1:%d", "%s", "%s")
76
77 pc = newPacketCache(100, {maxTTL=86400, minTTL=1})
78 getPool(""):setCache(pc)
79 """
80 _config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey']
81
82 def getQUICConnection(self):
83 return self.getDOQConnection(self._doqServerPort, self._caCert)
84
85 def sendQUICQuery(self, query, response=None, useQueue=True, connection=None):
86 return self.sendDOQQuery(self._doqServerPort, query, response=response, caFile=self._caCert, useQueue=useQueue, serverName=self._serverName, connection=connection)
87
88 class TestDOQWithACL(QUICACLTests, DNSDistTest):
89 _serverKey = 'server.key'
90 _serverCert = 'server.chain'
91 _serverName = 'tls.tests.dnsdist.org'
92 _caCert = 'ca.pem'
93 _doqServerPort = pickAvailablePort()
94 _config_template = """
95 newServer{address="127.0.0.1:%d"}
96
97 setACL("192.0.2.1/32")
98 addDOQLocal("127.0.0.1:%d", "%s", "%s")
99 """
100 _config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey']
101
102 def getQUICConnection(self):
103 return self.getDOQConnection(self._doqServerPort, self._caCert)
104
105 def sendQUICQuery(self, query, response=None, useQueue=True, connection=None):
106 return self.sendDOQQuery(self._doqServerPort, query, response=response, caFile=self._caCert, useQueue=useQueue, serverName=self._serverName, connection=connection)
107
108 class TestDOQCertificateReloading(DNSDistTest):
109 _consoleKey = DNSDistTest.generateConsoleKey()
110 _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
111 _serverKey = 'server-doq.key'
112 _serverCert = 'server-doq.chain'
113 _serverName = 'tls.tests.dnsdist.org'
114 _caCert = 'ca.pem'
115 _doqServerPort = pickAvailablePort()
116 _config_template = """
117 setKey("%s")
118 controlSocket("127.0.0.1:%s")
119
120 newServer{address="127.0.0.1:%d"}
121
122 addDOQLocal("127.0.0.1:%d", "%s", "%s")
123 """
124 _config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort', '_doqServerPort','_serverCert', '_serverKey']
125
126 @classmethod
127 def setUpClass(cls):
128 cls.generateNewCertificateAndKey('server-doq')
129 cls.startResponders()
130 cls.startDNSDist()
131 cls.setUpSockets()
132
133 def testCertificateReloaded(self):
134 name = 'certificate-reload.doq.tests.powerdns.com.'
135 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
136 query.id = 0
137 (_, serial) = quic_query(query, '127.0.0.1', 0.5, self._doqServerPort, verify=self._caCert, server_hostname=self._serverName)
138
139 self.generateNewCertificateAndKey('server-doq')
140 self.sendConsoleCommand("reloadAllCertificates()")
141
142 (_, secondSerial) = quic_query(query, '127.0.0.1', 0.5, self._doqServerPort, verify=self._caCert, server_hostname=self._serverName)
143 # check that the serial is different
144 self.assertNotEqual(serial, secondSerial)