]>
Commit | Line | Data |
---|---|---|
3dc49a89 | 1 | #!/usr/bin/env python |
9ec97c74 | 2 | import base64 |
3dc49a89 CHB |
3 | import dns |
4 | import clientsubnetoption | |
5 | ||
6 | from dnsdisttests import DNSDistTest | |
7 | from dnsdisttests import pickAvailablePort | |
e7000cce | 8 | from doqclient import quic_bogus_query |
5d4d8e22 | 9 | from quictests import QUICTests, QUICWithCacheTests, QUICACLTests, QUICGetLocalAddressOnAnyBindTests, QUICXFRTests |
e7000cce | 10 | import doqclient |
9ec97c74 | 11 | from doqclient import quic_query |
e7000cce CHB |
12 | |
13 | class TestDOQBogus(DNSDistTest): | |
14 | _serverKey = 'server.key' | |
15 | _serverCert = 'server.chain' | |
16 | _serverName = 'tls.tests.dnsdist.org' | |
17 | _caCert = 'ca.pem' | |
18 | _doqServerPort = pickAvailablePort() | |
19 | _config_template = """ | |
20 | newServer{address="127.0.0.1:%d"} | |
21 | ||
22 | addDOQLocal("127.0.0.1:%d", "%s", "%s") | |
23 | """ | |
24 | _config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey'] | |
e7000cce CHB |
25 | |
26 | def testDOQBogus(self): | |
27 | """ | |
28 | DOQ: Test a bogus query (wrong packed length) | |
29 | """ | |
30 | name = 'bogus.doq.tests.powerdns.com.' | |
31 | query = dns.message.make_query(name, 'A', 'IN', use_edns=False) | |
32 | query.id = 0 | |
33 | expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096) | |
34 | expectedQuery.id = 0 | |
35 | ||
36 | try: | |
37 | message = quic_bogus_query(query, '127.0.0.1', 2.0, self._doqServerPort, verify=self._caCert, server_hostname=self._serverName) | |
38 | self.assertFalse(True) | |
39 | except doqclient.StreamResetError as e : | |
40 | self.assertEqual(e.error, 2); | |
3dc49a89 | 41 | |
ac70190e | 42 | class TestDOQ(QUICTests, DNSDistTest): |
3dc49a89 CHB |
43 | _serverKey = 'server.key' |
44 | _serverCert = 'server.chain' | |
0a6676a4 | 45 | _serverName = 'tls.tests.dnsdist.org' |
3dc49a89 | 46 | _caCert = 'ca.pem' |
8225acad | 47 | _doqServerPort = pickAvailablePort() |
3dc49a89 CHB |
48 | _config_template = """ |
49 | newServer{address="127.0.0.1:%d"} | |
50 | ||
51 | addAction("drop.doq.tests.powerdns.com.", DropAction()) | |
52 | addAction("refused.doq.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED)) | |
53 | addAction("spoof.doq.tests.powerdns.com.", SpoofAction("1.2.3.4")) | |
54 | addAction("no-backend.doq.tests.powerdns.com.", PoolAction('this-pool-has-no-backend')) | |
55 | ||
56 | addDOQLocal("127.0.0.1:%d", "%s", "%s") | |
57 | """ | |
58 | _config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey'] | |
3dc49a89 | 59 | |
ac70190e RG |
60 | def getQUICConnection(self): |
61 | return self.getDOQConnection(self._doqServerPort, self._caCert) | |
3dc49a89 | 62 | |
ac70190e RG |
63 | def sendQUICQuery(self, query, response=None, useQueue=True, connection=None): |
64 | return self.sendDOQQuery(self._doqServerPort, query, response=response, caFile=self._caCert, useQueue=useQueue, serverName=self._serverName, connection=connection) | |
57b57259 | 65 | |
ac70190e | 66 | class TestDOQWithCache(QUICWithCacheTests, DNSDistTest): |
57b57259 CHB |
67 | _serverKey = 'server.key' |
68 | _serverCert = 'server.chain' | |
0a6676a4 | 69 | _serverName = 'tls.tests.dnsdist.org' |
57b57259 | 70 | _caCert = 'ca.pem' |
f9a95152 | 71 | _doqServerPort = pickAvailablePort() |
57b57259 CHB |
72 | _config_template = """ |
73 | newServer{address="127.0.0.1:%d"} | |
74 | ||
75 | addDOQLocal("127.0.0.1:%d", "%s", "%s") | |
76 | ||
77 | pc = newPacketCache(100, {maxTTL=86400, minTTL=1}) | |
78 | getPool(""):setCache(pc) | |
79 | """ | |
80 | _config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey'] | |
57b57259 | 81 | |
ac70190e RG |
82 | def getQUICConnection(self): |
83 | return self.getDOQConnection(self._doqServerPort, self._caCert) | |
57b57259 | 84 | |
ac70190e RG |
85 | def sendQUICQuery(self, query, response=None, useQueue=True, connection=None): |
86 | return self.sendDOQQuery(self._doqServerPort, query, response=response, caFile=self._caCert, useQueue=useQueue, serverName=self._serverName, connection=connection) | |
2aaf9ecd CHB |
87 | |
88 | class TestDOQWithACL(QUICACLTests, DNSDistTest): | |
89 | _serverKey = 'server.key' | |
90 | _serverCert = 'server.chain' | |
91 | _serverName = 'tls.tests.dnsdist.org' | |
92 | _caCert = 'ca.pem' | |
93 | _doqServerPort = pickAvailablePort() | |
94 | _config_template = """ | |
95 | newServer{address="127.0.0.1:%d"} | |
96 | ||
97 | setACL("192.0.2.1/32") | |
98 | addDOQLocal("127.0.0.1:%d", "%s", "%s") | |
99 | """ | |
100 | _config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey'] | |
2aaf9ecd CHB |
101 | |
102 | def getQUICConnection(self): | |
103 | return self.getDOQConnection(self._doqServerPort, self._caCert) | |
104 | ||
105 | def sendQUICQuery(self, query, response=None, useQueue=True, connection=None): | |
106 | return self.sendDOQQuery(self._doqServerPort, query, response=response, caFile=self._caCert, useQueue=useQueue, serverName=self._serverName, connection=connection) | |
5d4d8e22 RG |
107 | |
108 | class TestDOQXFR(QUICXFRTests, DNSDistTest): | |
109 | _serverKey = 'server.key' | |
110 | _serverCert = 'server.chain' | |
111 | _serverName = 'tls.tests.dnsdist.org' | |
112 | _caCert = 'ca.pem' | |
113 | _doqServerPort = pickAvailablePort() | |
114 | _config_template = """ | |
115 | newServer{address="127.0.0.1:%d", tcpOnly=True} | |
116 | ||
117 | addDOQLocal("127.0.0.1:%d", "%s", "%s") | |
118 | """ | |
119 | _config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey'] | |
120 | _verboseMode = True | |
121 | ||
122 | def getQUICConnection(self): | |
123 | return self.getDOQConnection(self._doqServerPort, self._caCert) | |
124 | ||
125 | def sendQUICQuery(self, query, response=None, useQueue=True, connection=None): | |
126 | return self.sendDOQQuery(self._doqServerPort, query, response=response, caFile=self._caCert, useQueue=useQueue, serverName=self._serverName, connection=connection) | |
9ec97c74 RG |
127 | |
128 | class TestDOQCertificateReloading(DNSDistTest): | |
129 | _consoleKey = DNSDistTest.generateConsoleKey() | |
130 | _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii') | |
131 | _serverKey = 'server-doq.key' | |
132 | _serverCert = 'server-doq.chain' | |
133 | _serverName = 'tls.tests.dnsdist.org' | |
134 | _caCert = 'ca.pem' | |
135 | _doqServerPort = pickAvailablePort() | |
136 | _config_template = """ | |
137 | setKey("%s") | |
138 | controlSocket("127.0.0.1:%s") | |
139 | ||
140 | newServer{address="127.0.0.1:%d"} | |
141 | ||
142 | addDOQLocal("127.0.0.1:%d", "%s", "%s") | |
143 | """ | |
144 | _config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort', '_doqServerPort','_serverCert', '_serverKey'] | |
145 | ||
146 | @classmethod | |
147 | def setUpClass(cls): | |
148 | cls.generateNewCertificateAndKey('server-doq') | |
149 | cls.startResponders() | |
150 | cls.startDNSDist() | |
151 | cls.setUpSockets() | |
152 | ||
153 | def testCertificateReloaded(self): | |
154 | name = 'certificate-reload.doq.tests.powerdns.com.' | |
155 | query = dns.message.make_query(name, 'A', 'IN', use_edns=False) | |
156 | query.id = 0 | |
157 | (_, serial) = quic_query(query, '127.0.0.1', 0.5, self._doqServerPort, verify=self._caCert, server_hostname=self._serverName) | |
158 | ||
159 | self.generateNewCertificateAndKey('server-doq') | |
160 | self.sendConsoleCommand("reloadAllCertificates()") | |
161 | ||
162 | (_, secondSerial) = quic_query(query, '127.0.0.1', 0.5, self._doqServerPort, verify=self._caCert, server_hostname=self._serverName) | |
163 | # check that the serial is different | |
164 | self.assertNotEqual(serial, secondSerial) | |
88913b83 RG |
165 | |
166 | class TestDOQGetLocalAddressOnAnyBind(QUICGetLocalAddressOnAnyBindTests, DNSDistTest): | |
167 | _serverKey = 'server.key' | |
168 | _serverCert = 'server.chain' | |
169 | _serverName = 'tls.tests.dnsdist.org' | |
170 | _caCert = 'ca.pem' | |
171 | _doqServerPort = pickAvailablePort() | |
172 | _config_template = """ | |
173 | function answerBasedOnLocalAddress(dq) | |
174 | local dest = tostring(dq.localaddr) | |
175 | local i, j = string.find(dest, "[0-9.]+") | |
176 | local addr = string.sub(dest, i, j) | |
177 | local dashAddr = string.gsub(addr, "[.]", "-") | |
178 | return DNSAction.Spoof, "address-was-"..dashAddr..".local-address-any.advanced.tests.powerdns.com." | |
179 | end | |
180 | addAction("local-address-any.quic.tests.powerdns.com.", LuaAction(answerBasedOnLocalAddress)) | |
181 | newServer{address="127.0.0.1:%s"} | |
182 | addDOQLocal("0.0.0.0:%d", "%s", "%s") | |
183 | addDOQLocal("[::]:%d", "%s", "%s") | |
184 | """ | |
185 | _config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey', '_doqServerPort','_serverCert', '_serverKey'] | |
186 | _acl = ['127.0.0.1/32', '::1/128'] | |
187 | _skipListeningOnCL = True | |
188 | ||
189 | def getQUICConnection(self): | |
190 | return self.getDOQConnection(self._doqServerPort, self._caCert) | |
191 | ||
192 | def sendQUICQuery(self, query, response=None, useQueue=True, connection=None): | |
193 | return self.sendDOQQuery(self._doqServerPort, query, response=response, caFile=self._caCert, useQueue=useQueue, serverName=self._serverName, connection=connection) |