]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_DOQ.py
Merge pull request #14324 from Habbie/auth-lua-docs-backquote-nit
[thirdparty/pdns.git] / regression-tests.dnsdist / test_DOQ.py
1 #!/usr/bin/env python
2 import base64
3 import dns
4 import clientsubnetoption
5
6 from dnsdisttests import DNSDistTest
7 from dnsdisttests import pickAvailablePort
8 from doqclient import quic_bogus_query
9 from quictests import QUICTests, QUICWithCacheTests, QUICACLTests, QUICGetLocalAddressOnAnyBindTests, QUICXFRTests
10 import doqclient
11 from doqclient import quic_query
12
13 class TestDOQBogus(DNSDistTest):
14 _serverKey = 'server.key'
15 _serverCert = 'server.chain'
16 _serverName = 'tls.tests.dnsdist.org'
17 _caCert = 'ca.pem'
18 _doqServerPort = pickAvailablePort()
19 _config_template = """
20 newServer{address="127.0.0.1:%d"}
21
22 addDOQLocal("127.0.0.1:%d", "%s", "%s")
23 """
24 _config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey']
25
26 def testDOQBogus(self):
27 """
28 DOQ: Test a bogus query (wrong packed length)
29 """
30 name = 'bogus.doq.tests.powerdns.com.'
31 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
32 query.id = 0
33 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
34 expectedQuery.id = 0
35
36 try:
37 message = quic_bogus_query(query, '127.0.0.1', 2.0, self._doqServerPort, verify=self._caCert, server_hostname=self._serverName)
38 self.assertFalse(True)
39 except doqclient.StreamResetError as e :
40 self.assertEqual(e.error, 2);
41
42 class TestDOQ(QUICTests, DNSDistTest):
43 _serverKey = 'server.key'
44 _serverCert = 'server.chain'
45 _serverName = 'tls.tests.dnsdist.org'
46 _caCert = 'ca.pem'
47 _doqServerPort = pickAvailablePort()
48 _config_template = """
49 newServer{address="127.0.0.1:%d"}
50
51 addAction("drop.doq.tests.powerdns.com.", DropAction())
52 addAction("refused.doq.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
53 addAction("spoof.doq.tests.powerdns.com.", SpoofAction("1.2.3.4"))
54 addAction("no-backend.doq.tests.powerdns.com.", PoolAction('this-pool-has-no-backend'))
55
56 addDOQLocal("127.0.0.1:%d", "%s", "%s")
57 """
58 _config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey']
59
60 def getQUICConnection(self):
61 return self.getDOQConnection(self._doqServerPort, self._caCert)
62
63 def sendQUICQuery(self, query, response=None, useQueue=True, connection=None):
64 return self.sendDOQQuery(self._doqServerPort, query, response=response, caFile=self._caCert, useQueue=useQueue, serverName=self._serverName, connection=connection)
65
66 class TestDOQWithCache(QUICWithCacheTests, DNSDistTest):
67 _serverKey = 'server.key'
68 _serverCert = 'server.chain'
69 _serverName = 'tls.tests.dnsdist.org'
70 _caCert = 'ca.pem'
71 _doqServerPort = pickAvailablePort()
72 _config_template = """
73 newServer{address="127.0.0.1:%d"}
74
75 addDOQLocal("127.0.0.1:%d", "%s", "%s")
76
77 pc = newPacketCache(100, {maxTTL=86400, minTTL=1})
78 getPool(""):setCache(pc)
79 """
80 _config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey']
81
82 def getQUICConnection(self):
83 return self.getDOQConnection(self._doqServerPort, self._caCert)
84
85 def sendQUICQuery(self, query, response=None, useQueue=True, connection=None):
86 return self.sendDOQQuery(self._doqServerPort, query, response=response, caFile=self._caCert, useQueue=useQueue, serverName=self._serverName, connection=connection)
87
88 class TestDOQWithACL(QUICACLTests, DNSDistTest):
89 _serverKey = 'server.key'
90 _serverCert = 'server.chain'
91 _serverName = 'tls.tests.dnsdist.org'
92 _caCert = 'ca.pem'
93 _doqServerPort = pickAvailablePort()
94 _config_template = """
95 newServer{address="127.0.0.1:%d"}
96
97 setACL("192.0.2.1/32")
98 addDOQLocal("127.0.0.1:%d", "%s", "%s")
99 """
100 _config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey']
101
102 def getQUICConnection(self):
103 return self.getDOQConnection(self._doqServerPort, self._caCert)
104
105 def sendQUICQuery(self, query, response=None, useQueue=True, connection=None):
106 return self.sendDOQQuery(self._doqServerPort, query, response=response, caFile=self._caCert, useQueue=useQueue, serverName=self._serverName, connection=connection)
107
108 class TestDOQXFR(QUICXFRTests, DNSDistTest):
109 _serverKey = 'server.key'
110 _serverCert = 'server.chain'
111 _serverName = 'tls.tests.dnsdist.org'
112 _caCert = 'ca.pem'
113 _doqServerPort = pickAvailablePort()
114 _config_template = """
115 newServer{address="127.0.0.1:%d", tcpOnly=True}
116
117 addDOQLocal("127.0.0.1:%d", "%s", "%s")
118 """
119 _config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey']
120 _verboseMode = True
121
122 def getQUICConnection(self):
123 return self.getDOQConnection(self._doqServerPort, self._caCert)
124
125 def sendQUICQuery(self, query, response=None, useQueue=True, connection=None):
126 return self.sendDOQQuery(self._doqServerPort, query, response=response, caFile=self._caCert, useQueue=useQueue, serverName=self._serverName, connection=connection)
127
128 class TestDOQCertificateReloading(DNSDistTest):
129 _consoleKey = DNSDistTest.generateConsoleKey()
130 _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
131 _serverKey = 'server-doq.key'
132 _serverCert = 'server-doq.chain'
133 _serverName = 'tls.tests.dnsdist.org'
134 _caCert = 'ca.pem'
135 _doqServerPort = pickAvailablePort()
136 _config_template = """
137 setKey("%s")
138 controlSocket("127.0.0.1:%s")
139
140 newServer{address="127.0.0.1:%d"}
141
142 addDOQLocal("127.0.0.1:%d", "%s", "%s")
143 """
144 _config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort', '_doqServerPort','_serverCert', '_serverKey']
145
146 @classmethod
147 def setUpClass(cls):
148 cls.generateNewCertificateAndKey('server-doq')
149 cls.startResponders()
150 cls.startDNSDist()
151 cls.setUpSockets()
152
153 def testCertificateReloaded(self):
154 name = 'certificate-reload.doq.tests.powerdns.com.'
155 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
156 query.id = 0
157 (_, serial) = quic_query(query, '127.0.0.1', 0.5, self._doqServerPort, verify=self._caCert, server_hostname=self._serverName)
158
159 self.generateNewCertificateAndKey('server-doq')
160 self.sendConsoleCommand("reloadAllCertificates()")
161
162 (_, secondSerial) = quic_query(query, '127.0.0.1', 0.5, self._doqServerPort, verify=self._caCert, server_hostname=self._serverName)
163 # check that the serial is different
164 self.assertNotEqual(serial, secondSerial)
165
166 class TestDOQGetLocalAddressOnAnyBind(QUICGetLocalAddressOnAnyBindTests, DNSDistTest):
167 _serverKey = 'server.key'
168 _serverCert = 'server.chain'
169 _serverName = 'tls.tests.dnsdist.org'
170 _caCert = 'ca.pem'
171 _doqServerPort = pickAvailablePort()
172 _config_template = """
173 function answerBasedOnLocalAddress(dq)
174 local dest = tostring(dq.localaddr)
175 local i, j = string.find(dest, "[0-9.]+")
176 local addr = string.sub(dest, i, j)
177 local dashAddr = string.gsub(addr, "[.]", "-")
178 return DNSAction.Spoof, "address-was-"..dashAddr..".local-address-any.advanced.tests.powerdns.com."
179 end
180 addAction("local-address-any.quic.tests.powerdns.com.", LuaAction(answerBasedOnLocalAddress))
181 newServer{address="127.0.0.1:%s"}
182 addDOQLocal("0.0.0.0:%d", "%s", "%s")
183 addDOQLocal("[::]:%d", "%s", "%s")
184 """
185 _config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey', '_doqServerPort','_serverCert', '_serverKey']
186 _acl = ['127.0.0.1/32', '::1/128']
187 _skipListeningOnCL = True
188
189 def getQUICConnection(self):
190 return self.getDOQConnection(self._doqServerPort, self._caCert)
191
192 def sendQUICQuery(self, query, response=None, useQueue=True, connection=None):
193 return self.sendDOQQuery(self._doqServerPort, query, response=response, caFile=self._caCert, useQueue=useQueue, serverName=self._serverName, connection=connection)