]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_XPF.py
Merge pull request #8713 from rgacogne/auth-strict-caches-size
[thirdparty/pdns.git] / regression-tests.dnsdist / test_XPF.py
1 #!/usr/bin/env python
2
3 import dns
4 from dnsdisttests import DNSDistTest
5
6 class XPFTest(DNSDistTest):
7 """
8 dnsdist is configured to add XPF to the query
9 """
10
11 _xpfCode = 65422
12 _config_template = """
13 newServer{address="127.0.0.1:%d", addXPF=%d}
14 """
15 _config_params = ['_testServerPort', '_xpfCode']
16
17 def checkMessageHasXPF(self, msg, expectedValue):
18 self.assertGreaterEqual(len(msg.additional), 1)
19
20 found = False
21 for add in msg.additional:
22 if add.rdtype == self._xpfCode:
23 found = True
24 self.assertEquals(add.rdclass, dns.rdataclass.IN)
25 self.assertEquals(add.ttl, 0)
26 xpfData = add.to_rdataset()[0].to_text()
27 # skip the ports
28 self.assertEquals(xpfData[:26], expectedValue[:26])
29
30 self.assertTrue(found)
31
32 def testXPF(self):
33 """
34 XPF
35 """
36 name = 'xpf.tests.powerdns.com.'
37 query = dns.message.make_query(name, 'A', 'IN')
38
39 expectedQuery = dns.message.make_query(name, 'A', 'IN')
40 # 0x04 is IPv4, 0x11 (17) is UDP then 127.0.0.1 as source and destination
41 # and finally the ports, zeroed because we have no way to know them beforehand
42 xpfData = "\# 14 04117f0000017f00000100000000"
43 rdata = dns.rdata.from_text(dns.rdataclass.IN, self._xpfCode, xpfData)
44 rrset = dns.rrset.from_rdata(name, 60, rdata)
45 expectedQuery.additional.append(rrset)
46
47 response = dns.message.make_response(expectedQuery)
48
49 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
50 self.assertTrue(receivedQuery)
51 self.assertTrue(receivedResponse)
52 receivedQuery.id = expectedQuery.id
53 receivedResponse.id = response.id
54
55 self.assertEquals(receivedQuery, expectedQuery)
56 self.checkMessageHasXPF(receivedQuery, xpfData)
57 self.assertEquals(response, receivedResponse)
58
59 expectedQuery = dns.message.make_query(name, 'A', 'IN')
60 # 0x04 is IPv4, 0x06 (6) is TCP then 127.0.0.1 as source and destination
61 # and finally the ports, zeroed because we have no way to know them beforehand
62 xpfData = "\# 14 04067f0000017f00000100000000"
63 rdata = dns.rdata.from_text(dns.rdataclass.IN, self._xpfCode, xpfData)
64 rrset = dns.rrset.from_rdata(name, 60, rdata)
65 expectedQuery.additional.append(rrset)
66
67 response = dns.message.make_response(expectedQuery)
68
69 (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
70 self.assertTrue(receivedQuery)
71 self.assertTrue(receivedResponse)
72 receivedQuery.id = expectedQuery.id
73 receivedResponse.id = response.id
74
75 self.assertEquals(receivedQuery, expectedQuery)
76 self.checkMessageHasXPF(receivedQuery, xpfData)
77 self.assertEquals(response, receivedResponse)