]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_XPF.py
4 from dnsdisttests
import DNSDistTest
6 class XPFTest(DNSDistTest
):
8 dnsdist is configured to add XPF to the query
12 _config_template
= """
13 newServer{address="127.0.0.1:%d", addXPF=%d}
15 _config_params
= ['_testServerPort', '_xpfCode']
17 def checkMessageHasXPF(self
, msg
, expectedValue
):
18 self
.assertGreaterEqual(len(msg
.additional
), 1)
21 for add
in msg
.additional
:
22 if add
.rdtype
== self
._xpfCode
:
24 self
.assertEquals(add
.rdclass
, dns
.rdataclass
.IN
)
25 self
.assertEquals(add
.ttl
, 0)
26 xpfData
= add
.to_rdataset()[0].to_text()
28 self
.assertEquals(xpfData
[:26], expectedValue
[:26])
30 self
.assertTrue(found
)
36 name
= 'xpf.tests.powerdns.com.'
37 query
= dns
.message
.make_query(name
, 'A', 'IN')
39 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN')
40 # 0x04 is IPv4, 0x11 (17) is UDP then 127.0.0.1 as source and destination
41 # and finally the ports, zeroed because we have no way to know them beforehand
42 xpfData
= "\# 14 04117f0000017f00000100000000"
43 rdata
= dns
.rdata
.from_text(dns
.rdataclass
.IN
, self
._xpfCode
, xpfData
)
44 rrset
= dns
.rrset
.from_rdata(name
, 60, rdata
)
45 expectedQuery
.additional
.append(rrset
)
47 response
= dns
.message
.make_response(expectedQuery
)
49 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
50 self
.assertTrue(receivedQuery
)
51 self
.assertTrue(receivedResponse
)
52 receivedQuery
.id = expectedQuery
.id
53 receivedResponse
.id = response
.id
55 self
.assertEquals(receivedQuery
, expectedQuery
)
56 self
.checkMessageHasXPF(receivedQuery
, xpfData
)
57 self
.assertEquals(response
, receivedResponse
)
59 expectedQuery
= dns
.message
.make_query(name
, 'A', 'IN')
60 # 0x04 is IPv4, 0x06 (6) is TCP then 127.0.0.1 as source and destination
61 # and finally the ports, zeroed because we have no way to know them beforehand
62 xpfData
= "\# 14 04067f0000017f00000100000000"
63 rdata
= dns
.rdata
.from_text(dns
.rdataclass
.IN
, self
._xpfCode
, xpfData
)
64 rrset
= dns
.rrset
.from_rdata(name
, 60, rdata
)
65 expectedQuery
.additional
.append(rrset
)
67 response
= dns
.message
.make_response(expectedQuery
)
69 (receivedQuery
, receivedResponse
) = self
.sendTCPQuery(query
, response
)
70 self
.assertTrue(receivedQuery
)
71 self
.assertTrue(receivedResponse
)
72 receivedQuery
.id = expectedQuery
.id
73 receivedResponse
.id = response
.id
75 self
.assertEquals(receivedQuery
, expectedQuery
)
76 self
.checkMessageHasXPF(receivedQuery
, xpfData
)
77 self
.assertEquals(response
, receivedResponse
)