]>
Commit | Line | Data |
---|---|---|
3dc49a89 | 1 | #!/usr/bin/env python |
9ec97c74 | 2 | import base64 |
3dc49a89 CHB |
3 | import dns |
4 | import clientsubnetoption | |
5 | ||
6 | from dnsdisttests import DNSDistTest | |
7 | from dnsdisttests import pickAvailablePort | |
e7000cce | 8 | from doqclient import quic_bogus_query |
2aaf9ecd | 9 | from quictests import QUICTests, QUICWithCacheTests, QUICACLTests |
e7000cce | 10 | import doqclient |
9ec97c74 | 11 | from doqclient import quic_query |
e7000cce CHB |
12 | |
13 | class TestDOQBogus(DNSDistTest): | |
14 | _serverKey = 'server.key' | |
15 | _serverCert = 'server.chain' | |
16 | _serverName = 'tls.tests.dnsdist.org' | |
17 | _caCert = 'ca.pem' | |
18 | _doqServerPort = pickAvailablePort() | |
19 | _config_template = """ | |
20 | newServer{address="127.0.0.1:%d"} | |
21 | ||
22 | addDOQLocal("127.0.0.1:%d", "%s", "%s") | |
23 | """ | |
24 | _config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey'] | |
e7000cce CHB |
25 | |
26 | def testDOQBogus(self): | |
27 | """ | |
28 | DOQ: Test a bogus query (wrong packed length) | |
29 | """ | |
30 | name = 'bogus.doq.tests.powerdns.com.' | |
31 | query = dns.message.make_query(name, 'A', 'IN', use_edns=False) | |
32 | query.id = 0 | |
33 | expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096) | |
34 | expectedQuery.id = 0 | |
35 | ||
36 | try: | |
37 | message = quic_bogus_query(query, '127.0.0.1', 2.0, self._doqServerPort, verify=self._caCert, server_hostname=self._serverName) | |
38 | self.assertFalse(True) | |
39 | except doqclient.StreamResetError as e : | |
40 | self.assertEqual(e.error, 2); | |
3dc49a89 | 41 | |
ac70190e | 42 | class TestDOQ(QUICTests, DNSDistTest): |
3dc49a89 CHB |
43 | _serverKey = 'server.key' |
44 | _serverCert = 'server.chain' | |
0a6676a4 | 45 | _serverName = 'tls.tests.dnsdist.org' |
3dc49a89 | 46 | _caCert = 'ca.pem' |
8225acad | 47 | _doqServerPort = pickAvailablePort() |
3dc49a89 CHB |
48 | _config_template = """ |
49 | newServer{address="127.0.0.1:%d"} | |
50 | ||
51 | addAction("drop.doq.tests.powerdns.com.", DropAction()) | |
52 | addAction("refused.doq.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED)) | |
53 | addAction("spoof.doq.tests.powerdns.com.", SpoofAction("1.2.3.4")) | |
54 | addAction("no-backend.doq.tests.powerdns.com.", PoolAction('this-pool-has-no-backend')) | |
55 | ||
56 | addDOQLocal("127.0.0.1:%d", "%s", "%s") | |
57 | """ | |
58 | _config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey'] | |
3dc49a89 | 59 | |
ac70190e RG |
60 | def getQUICConnection(self): |
61 | return self.getDOQConnection(self._doqServerPort, self._caCert) | |
3dc49a89 | 62 | |
ac70190e RG |
63 | def sendQUICQuery(self, query, response=None, useQueue=True, connection=None): |
64 | return self.sendDOQQuery(self._doqServerPort, query, response=response, caFile=self._caCert, useQueue=useQueue, serverName=self._serverName, connection=connection) | |
57b57259 | 65 | |
ac70190e | 66 | class TestDOQWithCache(QUICWithCacheTests, DNSDistTest): |
57b57259 CHB |
67 | _serverKey = 'server.key' |
68 | _serverCert = 'server.chain' | |
0a6676a4 | 69 | _serverName = 'tls.tests.dnsdist.org' |
57b57259 | 70 | _caCert = 'ca.pem' |
f9a95152 | 71 | _doqServerPort = pickAvailablePort() |
57b57259 CHB |
72 | _config_template = """ |
73 | newServer{address="127.0.0.1:%d"} | |
74 | ||
75 | addDOQLocal("127.0.0.1:%d", "%s", "%s") | |
76 | ||
77 | pc = newPacketCache(100, {maxTTL=86400, minTTL=1}) | |
78 | getPool(""):setCache(pc) | |
79 | """ | |
80 | _config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey'] | |
57b57259 | 81 | |
ac70190e RG |
82 | def getQUICConnection(self): |
83 | return self.getDOQConnection(self._doqServerPort, self._caCert) | |
57b57259 | 84 | |
ac70190e RG |
85 | def sendQUICQuery(self, query, response=None, useQueue=True, connection=None): |
86 | return self.sendDOQQuery(self._doqServerPort, query, response=response, caFile=self._caCert, useQueue=useQueue, serverName=self._serverName, connection=connection) | |
2aaf9ecd CHB |
87 | |
88 | class TestDOQWithACL(QUICACLTests, DNSDistTest): | |
89 | _serverKey = 'server.key' | |
90 | _serverCert = 'server.chain' | |
91 | _serverName = 'tls.tests.dnsdist.org' | |
92 | _caCert = 'ca.pem' | |
93 | _doqServerPort = pickAvailablePort() | |
94 | _config_template = """ | |
95 | newServer{address="127.0.0.1:%d"} | |
96 | ||
97 | setACL("192.0.2.1/32") | |
98 | addDOQLocal("127.0.0.1:%d", "%s", "%s") | |
99 | """ | |
100 | _config_params = ['_testServerPort', '_doqServerPort','_serverCert', '_serverKey'] | |
2aaf9ecd CHB |
101 | |
102 | def getQUICConnection(self): | |
103 | return self.getDOQConnection(self._doqServerPort, self._caCert) | |
104 | ||
105 | def sendQUICQuery(self, query, response=None, useQueue=True, connection=None): | |
106 | return self.sendDOQQuery(self._doqServerPort, query, response=response, caFile=self._caCert, useQueue=useQueue, serverName=self._serverName, connection=connection) | |
9ec97c74 RG |
107 | |
108 | class TestDOQCertificateReloading(DNSDistTest): | |
109 | _consoleKey = DNSDistTest.generateConsoleKey() | |
110 | _consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii') | |
111 | _serverKey = 'server-doq.key' | |
112 | _serverCert = 'server-doq.chain' | |
113 | _serverName = 'tls.tests.dnsdist.org' | |
114 | _caCert = 'ca.pem' | |
115 | _doqServerPort = pickAvailablePort() | |
116 | _config_template = """ | |
117 | setKey("%s") | |
118 | controlSocket("127.0.0.1:%s") | |
119 | ||
120 | newServer{address="127.0.0.1:%d"} | |
121 | ||
122 | addDOQLocal("127.0.0.1:%d", "%s", "%s") | |
123 | """ | |
124 | _config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort', '_doqServerPort','_serverCert', '_serverKey'] | |
125 | ||
126 | @classmethod | |
127 | def setUpClass(cls): | |
128 | cls.generateNewCertificateAndKey('server-doq') | |
129 | cls.startResponders() | |
130 | cls.startDNSDist() | |
131 | cls.setUpSockets() | |
132 | ||
133 | def testCertificateReloaded(self): | |
134 | name = 'certificate-reload.doq.tests.powerdns.com.' | |
135 | query = dns.message.make_query(name, 'A', 'IN', use_edns=False) | |
136 | query.id = 0 | |
137 | (_, serial) = quic_query(query, '127.0.0.1', 0.5, self._doqServerPort, verify=self._caCert, server_hostname=self._serverName) | |
138 | ||
139 | self.generateNewCertificateAndKey('server-doq') | |
140 | self.sendConsoleCommand("reloadAllCertificates()") | |
141 | ||
142 | (_, secondSerial) = quic_query(query, '127.0.0.1', 0.5, self._doqServerPort, verify=self._caCert, server_hostname=self._serverName) | |
143 | # check that the serial is different | |
144 | self.assertNotEqual(serial, secondSerial) |