]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.dnsdist/test_DOQ.py
Merge pull request #14272 from romeroalx/fix-docs-scm
[thirdparty/pdns.git] / regression-tests.dnsdist / test_DOQ.py
CommitLineData
3dc49a89 1#!/usr/bin/env python
9ec97c74 2import base64
3dc49a89
CHB
3import dns
4import clientsubnetoption
5
6from dnsdisttests import DNSDistTest
7from dnsdisttests import pickAvailablePort
e7000cce 8from doqclient import quic_bogus_query
5d4d8e22 9from quictests import QUICTests, QUICWithCacheTests, QUICACLTests, QUICGetLocalAddressOnAnyBindTests, QUICXFRTests
e7000cce 10import doqclient
9ec97c74 11from doqclient import quic_query
e7000cce
CHB
12
13class TestDOQBogus(DNSDistTest):
14 _serverKey = 'server.key'
15 _serverCert = 'server.chain'
16 _serverName = 'tls.tests.dnsdist.org'
17 _caCert = 'ca.pem'
18 _doqServerPort = pickAvailablePort()
19 _config_template = """
20 newServer{address="127.0.0.1:%d"}
21
22 addDOQLocal("127.0.0.1:%d", "%s", "%s")
23 """
24 _config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey']
e7000cce
CHB
25
26 def testDOQBogus(self):
27 """
28 DOQ: Test a bogus query (wrong packed length)
29 """
30 name = 'bogus.doq.tests.powerdns.com.'
31 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
32 query.id = 0
33 expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
34 expectedQuery.id = 0
35
36 try:
37 message = quic_bogus_query(query, '127.0.0.1', 2.0, self._doqServerPort, verify=self._caCert, server_hostname=self._serverName)
38 self.assertFalse(True)
39 except doqclient.StreamResetError as e :
40 self.assertEqual(e.error, 2);
3dc49a89 41
ac70190e 42class TestDOQ(QUICTests, DNSDistTest):
3dc49a89
CHB
43 _serverKey = 'server.key'
44 _serverCert = 'server.chain'
0a6676a4 45 _serverName = 'tls.tests.dnsdist.org'
3dc49a89 46 _caCert = 'ca.pem'
8225acad 47 _doqServerPort = pickAvailablePort()
3dc49a89
CHB
48 _config_template = """
49 newServer{address="127.0.0.1:%d"}
50
51 addAction("drop.doq.tests.powerdns.com.", DropAction())
52 addAction("refused.doq.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
53 addAction("spoof.doq.tests.powerdns.com.", SpoofAction("1.2.3.4"))
54 addAction("no-backend.doq.tests.powerdns.com.", PoolAction('this-pool-has-no-backend'))
55
56 addDOQLocal("127.0.0.1:%d", "%s", "%s")
57 """
58 _config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey']
3dc49a89 59
ac70190e
RG
60 def getQUICConnection(self):
61 return self.getDOQConnection(self._doqServerPort, self._caCert)
3dc49a89 62
ac70190e
RG
63 def sendQUICQuery(self, query, response=None, useQueue=True, connection=None):
64 return self.sendDOQQuery(self._doqServerPort, query, response=response, caFile=self._caCert, useQueue=useQueue, serverName=self._serverName, connection=connection)
57b57259 65
ac70190e 66class TestDOQWithCache(QUICWithCacheTests, DNSDistTest):
57b57259
CHB
67 _serverKey = 'server.key'
68 _serverCert = 'server.chain'
0a6676a4 69 _serverName = 'tls.tests.dnsdist.org'
57b57259 70 _caCert = 'ca.pem'
f9a95152 71 _doqServerPort = pickAvailablePort()
57b57259
CHB
72 _config_template = """
73 newServer{address="127.0.0.1:%d"}
74
75 addDOQLocal("127.0.0.1:%d", "%s", "%s")
76
77 pc = newPacketCache(100, {maxTTL=86400, minTTL=1})
78 getPool(""):setCache(pc)
79 """
80 _config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey']
57b57259 81
ac70190e
RG
82 def getQUICConnection(self):
83 return self.getDOQConnection(self._doqServerPort, self._caCert)
57b57259 84
ac70190e
RG
85 def sendQUICQuery(self, query, response=None, useQueue=True, connection=None):
86 return self.sendDOQQuery(self._doqServerPort, query, response=response, caFile=self._caCert, useQueue=useQueue, serverName=self._serverName, connection=connection)
2aaf9ecd
CHB
87
88class TestDOQWithACL(QUICACLTests, DNSDistTest):
89 _serverKey = 'server.key'
90 _serverCert = 'server.chain'
91 _serverName = 'tls.tests.dnsdist.org'
92 _caCert = 'ca.pem'
93 _doqServerPort = pickAvailablePort()
94 _config_template = """
95 newServer{address="127.0.0.1:%d"}
96
97 setACL("192.0.2.1/32")
98 addDOQLocal("127.0.0.1:%d", "%s", "%s")
99 """
100 _config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey']
2aaf9ecd
CHB
101
102 def getQUICConnection(self):
103 return self.getDOQConnection(self._doqServerPort, self._caCert)
104
105 def sendQUICQuery(self, query, response=None, useQueue=True, connection=None):
106 return self.sendDOQQuery(self._doqServerPort, query, response=response, caFile=self._caCert, useQueue=useQueue, serverName=self._serverName, connection=connection)
5d4d8e22
RG
107
108class TestDOQXFR(QUICXFRTests, DNSDistTest):
109 _serverKey = 'server.key'
110 _serverCert = 'server.chain'
111 _serverName = 'tls.tests.dnsdist.org'
112 _caCert = 'ca.pem'
113 _doqServerPort = pickAvailablePort()
114 _config_template = """
115 newServer{address="127.0.0.1:%d", tcpOnly=True}
116
117 addDOQLocal("127.0.0.1:%d", "%s", "%s")
118 """
119 _config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey']
120 _verboseMode = True
121
122 def getQUICConnection(self):
123 return self.getDOQConnection(self._doqServerPort, self._caCert)
124
125 def sendQUICQuery(self, query, response=None, useQueue=True, connection=None):
126 return self.sendDOQQuery(self._doqServerPort, query, response=response, caFile=self._caCert, useQueue=useQueue, serverName=self._serverName, connection=connection)
9ec97c74
RG
127
128class TestDOQCertificateReloading(DNSDistTest):
129 _consoleKey = DNSDistTest.generateConsoleKey()
130 _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
131 _serverKey = 'server-doq.key'
132 _serverCert = 'server-doq.chain'
133 _serverName = 'tls.tests.dnsdist.org'
134 _caCert = 'ca.pem'
135 _doqServerPort = pickAvailablePort()
136 _config_template = """
137 setKey("%s")
138 controlSocket("127.0.0.1:%s")
139
140 newServer{address="127.0.0.1:%d"}
141
142 addDOQLocal("127.0.0.1:%d", "%s", "%s")
143 """
144 _config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort', '_doqServerPort','_serverCert', '_serverKey']
145
146 @classmethod
147 def setUpClass(cls):
148 cls.generateNewCertificateAndKey('server-doq')
149 cls.startResponders()
150 cls.startDNSDist()
151 cls.setUpSockets()
152
153 def testCertificateReloaded(self):
154 name = 'certificate-reload.doq.tests.powerdns.com.'
155 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
156 query.id = 0
157 (_, serial) = quic_query(query, '127.0.0.1', 0.5, self._doqServerPort, verify=self._caCert, server_hostname=self._serverName)
158
159 self.generateNewCertificateAndKey('server-doq')
160 self.sendConsoleCommand("reloadAllCertificates()")
161
162 (_, secondSerial) = quic_query(query, '127.0.0.1', 0.5, self._doqServerPort, verify=self._caCert, server_hostname=self._serverName)
163 # check that the serial is different
164 self.assertNotEqual(serial, secondSerial)
88913b83
RG
165
166class TestDOQGetLocalAddressOnAnyBind(QUICGetLocalAddressOnAnyBindTests, DNSDistTest):
167 _serverKey = 'server.key'
168 _serverCert = 'server.chain'
169 _serverName = 'tls.tests.dnsdist.org'
170 _caCert = 'ca.pem'
171 _doqServerPort = pickAvailablePort()
172 _config_template = """
173 function answerBasedOnLocalAddress(dq)
174 local dest = tostring(dq.localaddr)
175 local i, j = string.find(dest, "[0-9.]+")
176 local addr = string.sub(dest, i, j)
177 local dashAddr = string.gsub(addr, "[.]", "-")
178 return DNSAction.Spoof, "address-was-"..dashAddr..".local-address-any.advanced.tests.powerdns.com."
179 end
180 addAction("local-address-any.quic.tests.powerdns.com.", LuaAction(answerBasedOnLocalAddress))
181 newServer{address="127.0.0.1:%s"}
182 addDOQLocal("0.0.0.0:%d", "%s", "%s")
183 addDOQLocal("[::]:%d", "%s", "%s")
184 """
185 _config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey', '_doqServerPort','_serverCert', '_serverKey']
186 _acl = ['127.0.0.1/32', '::1/128']
187 _skipListeningOnCL = True
188
189 def getQUICConnection(self):
190 return self.getDOQConnection(self._doqServerPort, self._caCert)
191
192 def sendQUICQuery(self, query, response=None, useQueue=True, connection=None):
193 return self.sendDOQQuery(self._doqServerPort, query, response=response, caFile=self._caCert, useQueue=useQueue, serverName=self._serverName, connection=connection)