]> git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_SVCB.py
Merge pull request #13670 from chbruyand/dnsdist-doq-acl
[thirdparty/pdns.git] / regression-tests.dnsdist / test_SVCB.py
1 #!/usr/bin/env python
2 import dns
3 from dnsdisttests import DNSDistTest
4
5 class TestSVCB(DNSDistTest):
6
7 _config_template = """
8 local basicSVC = { newSVCRecordParameters(1, "dot.powerdns.com.", { mandatory={"port"}, alpn={"dot"}, noDefaultAlpn=true, port=853, ipv4hint={ "192.0.2.1" }, ipv6hint={ "2001:db8::1" } }),
9 newSVCRecordParameters(2, "doh.powerdns.com.", { mandatory={"port"}, alpn={"h2"}, port=443, ipv4hint={ "192.0.2.2" }, ipv6hint={ "2001:db8::2" }, key7="/dns-query{?dns}" })
10 }
11 addAction(AndRule{QTypeRule(64), SuffixMatchNodeRule("basic.svcb.tests.powerdns.com.")}, SpoofSVCAction(basicSVC, {aa=true}))
12
13 local noHintsSVC = { newSVCRecordParameters(1, "dot.powerdns.com.", { mandatory={"port"}, alpn={"dot"}, noDefaultAlpn=true, port=853}),
14 newSVCRecordParameters(2, "doh.powerdns.com.", { mandatory={"port"}, alpn={"h2"}, port=443, key7="/dns-query{?dns}" })
15 }
16 addAction(AndRule{QTypeRule(64), SuffixMatchNodeRule("no-hints.svcb.tests.powerdns.com.")}, SpoofSVCAction(noHintsSVC, {aa=true}))
17
18 local effectiveTargetSVC = { newSVCRecordParameters(1, ".", { mandatory={"port"}, alpn={ "dot" }, noDefaultAlpn=true, port=853, ipv4hint={ "192.0.2.1" }, ipv6hint={ "2001:db8::1" }}),
19 newSVCRecordParameters(2, ".", { mandatory={"port"}, alpn={ "h2" }, port=443, ipv4hint={ "192.0.2.1" }, ipv6hint={ "2001:db8::1" }, key7="/dns-query{?dns}"})
20 }
21 addAction(AndRule{QTypeRule(64), SuffixMatchNodeRule("effective-target.svcb.tests.powerdns.com.")}, SpoofSVCAction(effectiveTargetSVC, {aa=true}))
22
23 local httpsSVC = { newSVCRecordParameters(1, ".", { mandatory={"port"}, alpn={ "h2" }, noDefaultAlpn=true, port=8002, ipv4hint={ "192.0.2.2" }, ipv6hint={ "2001:db8::2" }}) }
24 addAction(AndRule{QTypeRule(65), SuffixMatchNodeRule("https.svcb.tests.powerdns.com.")}, SpoofSVCAction(httpsSVC))
25
26 newServer{address="127.0.0.1:%s"}
27 """
28
29 def testBasic(self):
30 """
31 SVCB: Basic service binding
32 """
33 name = 'basic.svcb.tests.powerdns.com.'
34 query = dns.message.make_query(name, 64, 'IN')
35 # dnsdist set RA = RD for spoofed responses
36 query.flags &= ~dns.flags.RD
37
38 for method in ("sendUDPQuery", "sendTCPQuery"):
39 sender = getattr(self, method)
40 (_, receivedResponse) = sender(query, response=None, useQueue=False)
41 self.assertTrue(receivedResponse)
42 self.assertEqual(len(receivedResponse.answer), 1)
43 self.assertEqual(receivedResponse.answer[0].rdtype, 64)
44 self.assertEqual(len(receivedResponse.additional), 4)
45 self.assertEqual(receivedResponse.additional[0], dns.rrset.from_text("doh.powerdns.com.", 60, dns.rdataclass.IN, dns.rdatatype.A, '192.0.2.2'))
46 self.assertEqual(receivedResponse.additional[1], dns.rrset.from_text("dot.powerdns.com.", 60, dns.rdataclass.IN, dns.rdatatype.A, '192.0.2.1'))
47 self.assertEqual(receivedResponse.additional[2], dns.rrset.from_text("doh.powerdns.com.", 60, dns.rdataclass.IN, dns.rdatatype.AAAA, '2001:db8::2'))
48 self.assertEqual(receivedResponse.additional[3], dns.rrset.from_text("dot.powerdns.com.", 60, dns.rdataclass.IN, dns.rdatatype.AAAA, '2001:db8::1'))
49
50 def testNoHints(self):
51 """
52 SVCB: No hints
53 """
54 name = 'no-hints.svcb.tests.powerdns.com.'
55 query = dns.message.make_query(name, 64, 'IN')
56 # dnsdist set RA = RD for spoofed responses
57 query.flags &= ~dns.flags.RD
58
59 for method in ("sendUDPQuery", "sendTCPQuery"):
60 sender = getattr(self, method)
61 (_, receivedResponse) = sender(query, response=None, useQueue=False)
62 self.assertTrue(receivedResponse)
63 self.assertEqual(len(receivedResponse.answer), 1)
64 self.assertEqual(receivedResponse.answer[0].rdtype, 64)
65 self.assertEqual(len(receivedResponse.additional), 0)
66
67 def testEffectiveTarget(self):
68 """
69 SVCB: Effective target
70 """
71 name = 'effective-target.svcb.tests.powerdns.com.'
72 query = dns.message.make_query(name, 64, 'IN')
73 # dnsdist set RA = RD for spoofed responses
74 query.flags &= ~dns.flags.RD
75
76 for method in ("sendUDPQuery", "sendTCPQuery"):
77 sender = getattr(self, method)
78 (_, receivedResponse) = sender(query, response=None, useQueue=False)
79 self.assertTrue(receivedResponse)
80 self.assertEqual(len(receivedResponse.answer), 1)
81 self.assertEqual(receivedResponse.answer[0].rdtype, 64)
82 self.assertEqual(len(receivedResponse.additional), 2)
83 self.assertEqual(receivedResponse.additional[0], dns.rrset.from_text(name, 60, dns.rdataclass.IN, dns.rdatatype.A, '192.0.2.1'))
84 self.assertEqual(receivedResponse.additional[1], dns.rrset.from_text(name, 60, dns.rdataclass.IN, dns.rdatatype.AAAA, '2001:db8::1'))
85
86 def testHTTPS(self):
87 """
88 SVCB: HTTPS
89 """
90 name = 'https.svcb.tests.powerdns.com.'
91 query = dns.message.make_query(name, 65, 'IN')
92 # dnsdist set RA = RD for spoofed responses
93 query.flags &= ~dns.flags.RD
94
95 for method in ("sendUDPQuery", "sendTCPQuery"):
96 sender = getattr(self, method)
97 (_, receivedResponse) = sender(query, response=None, useQueue=False)
98 self.assertTrue(receivedResponse)
99 self.assertEqual(len(receivedResponse.answer), 1)
100 self.assertEqual(receivedResponse.answer[0].rdtype, 65)
101 self.assertEqual(len(receivedResponse.additional), 2)
102 self.assertEqual(receivedResponse.additional[0], dns.rrset.from_text(name, 60, dns.rdataclass.IN, dns.rdatatype.A, '192.0.2.2'))
103 self.assertEqual(receivedResponse.additional[1], dns.rrset.from_text(name, 60, dns.rdataclass.IN, dns.rdatatype.AAAA, '2001:db8::2'))