]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_XPF.py
4 from dnsdisttests
import DNSDistTest
6 class XPFTest(DNSDistTest
):
8 dnsdist is configured to add XPF to the query
12 _config_template
= """
13 newServer{address="127.0.0.1:%d", addXPF=%d}
15 _config_params
= ['_testServerPort', '_xpfCode']
17 def checkMessageHasXPF(self
, msg
, expectedValue
):
18 self
.assertGreaterEqual(len(msg
.additional
), 1)
21 for add
in msg
.additional
:
22 if add
.rdtype
== self
._xpfCode
:
24 self
.assertEqual(add
.rdclass
, dns
.rdataclass
.IN
)
25 self
.assertEqual(add
.ttl
, 0)
26 xpfData
= add
.to_rdataset()[0].to_text()
28 self
.assertEqual(xpfData
[:26], expectedValue
[:26])
30 self
.assertTrue(found
)
36 name
= 'xpf.tests.powerdns.com.'
37 query
= dns
.message
.make_query(name
, 'A', 'IN')
39 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN')
40 # 0x04 is IPv4, 0x11 (17) is UDP then 127.0.0.1 as source and destination
41 # and finally the ports, zeroed because we have no way to know them beforehand
42 xpfData
= "\\# 14 04117f0000017f00000100000000"
43 rdata
= dns
.rdata
.from_text(dns
.rdataclass
.IN
, self
._xpfCode
, xpfData
)
44 rrset
= dns
.rrset
.from_rdata(".", 0, rdata
)
45 expectedQuery
.additional
.append(rrset
)
47 response
= dns
.message
.make_response(expectedQuery
)
49 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
50 self
.assertTrue(receivedQuery
)
51 self
.assertTrue(receivedResponse
)
52 receivedQuery
.id = expectedQuery
.id
53 receivedResponse
.id = response
.id
55 self
.checkMessageHasXPF(receivedQuery
, xpfData
)
56 self
.assertEqual(response
, receivedResponse
)
58 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN')
59 # 0x04 is IPv4, 0x06 (6) is TCP then 127.0.0.1 as source and destination
60 # and finally the ports, zeroed because we have no way to know them beforehand
61 xpfData
= "\\# 14 04067f0000017f00000100000000"
62 rdata
= dns
.rdata
.from_text(dns
.rdataclass
.IN
, self
._xpfCode
, xpfData
)
63 rrset
= dns
.rrset
.from_rdata(".", 0, rdata
)
64 expectedQuery
.additional
.append(rrset
)
66 response
= dns
.message
.make_response(expectedQuery
)
68 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
69 self
.assertTrue(receivedQuery
)
70 self
.assertTrue(receivedResponse
)
71 receivedQuery
.id = expectedQuery
.id
72 receivedResponse
.id = response
.id
74 self
.checkMessageHasXPF(receivedQuery
, xpfData
)
75 self
.assertEqual(response
, receivedResponse
)