]>
Commit | Line | Data |
---|---|---|
651f3a09 RG |
1 | #!/usr/bin/env python |
2 | import dns | |
3 | from dnsdisttests import DNSDistTest | |
4 | ||
5 | class TestSVCB(DNSDistTest): | |
6 | ||
7 | _config_template = """ | |
90e56c36 | 8 | local basicSVC = { newSVCRecordParameters(1, "dot.powerdns.com.", { mandatory={"port"}, alpn={"dot"}, noDefaultAlpn=true, port=853, ipv4hint={ "192.0.2.1" }, ipv6hint={ "2001:db8::1" } }), |
657bba10 | 9 | newSVCRecordParameters(2, "doh.powerdns.com.", { mandatory={"port"}, alpn={"h2"}, port=443, ipv4hint={ "192.0.2.2" }, ipv6hint={ "2001:db8::2" }, key7="/dns-query{?dns}" }) |
651f3a09 | 10 | } |
cc35f43b | 11 | addAction(AndRule{QTypeRule(64), SuffixMatchNodeRule("basic.svcb.tests.powerdns.com.")}, SpoofSVCAction(basicSVC, {aa=true})) |
651f3a09 | 12 | |
90e56c36 | 13 | local noHintsSVC = { newSVCRecordParameters(1, "dot.powerdns.com.", { mandatory={"port"}, alpn={"dot"}, noDefaultAlpn=true, port=853}), |
657bba10 | 14 | newSVCRecordParameters(2, "doh.powerdns.com.", { mandatory={"port"}, alpn={"h2"}, port=443, key7="/dns-query{?dns}" }) |
651f3a09 | 15 | } |
cc35f43b | 16 | addAction(AndRule{QTypeRule(64), SuffixMatchNodeRule("no-hints.svcb.tests.powerdns.com.")}, SpoofSVCAction(noHintsSVC, {aa=true})) |
651f3a09 | 17 | |
90e56c36 | 18 | local effectiveTargetSVC = { newSVCRecordParameters(1, ".", { mandatory={"port"}, alpn={ "dot" }, noDefaultAlpn=true, port=853, ipv4hint={ "192.0.2.1" }, ipv6hint={ "2001:db8::1" }}), |
657bba10 | 19 | newSVCRecordParameters(2, ".", { mandatory={"port"}, alpn={ "h2" }, port=443, ipv4hint={ "192.0.2.1" }, ipv6hint={ "2001:db8::1" }, key7="/dns-query{?dns}"}) |
651f3a09 | 20 | } |
cc35f43b | 21 | addAction(AndRule{QTypeRule(64), SuffixMatchNodeRule("effective-target.svcb.tests.powerdns.com.")}, SpoofSVCAction(effectiveTargetSVC, {aa=true})) |
651f3a09 | 22 | |
90e56c36 | 23 | local httpsSVC = { newSVCRecordParameters(1, ".", { mandatory={"port"}, alpn={ "h2" }, noDefaultAlpn=true, port=8002, ipv4hint={ "192.0.2.2" }, ipv6hint={ "2001:db8::2" }}) } |
cc35f43b | 24 | addAction(AndRule{QTypeRule(65), SuffixMatchNodeRule("https.svcb.tests.powerdns.com.")}, SpoofSVCAction(httpsSVC)) |
651f3a09 RG |
25 | |
26 | newServer{address="127.0.0.1:%s"} | |
27 | """ | |
28 | ||
29 | def testBasic(self): | |
30 | """ | |
31 | SVCB: Basic service binding | |
32 | """ | |
33 | name = 'basic.svcb.tests.powerdns.com.' | |
34 | query = dns.message.make_query(name, 64, 'IN') | |
35 | # dnsdist set RA = RD for spoofed responses | |
36 | query.flags &= ~dns.flags.RD | |
37 | ||
38 | for method in ("sendUDPQuery", "sendTCPQuery"): | |
39 | sender = getattr(self, method) | |
40 | (_, receivedResponse) = sender(query, response=None, useQueue=False) | |
41 | self.assertTrue(receivedResponse) | |
42 | self.assertEqual(len(receivedResponse.answer), 1) | |
43 | self.assertEqual(receivedResponse.answer[0].rdtype, 64) | |
44 | self.assertEqual(len(receivedResponse.additional), 4) | |
45 | self.assertEqual(receivedResponse.additional[0], dns.rrset.from_text("doh.powerdns.com.", 60, dns.rdataclass.IN, dns.rdatatype.A, '192.0.2.2')) | |
46 | self.assertEqual(receivedResponse.additional[1], dns.rrset.from_text("dot.powerdns.com.", 60, dns.rdataclass.IN, dns.rdatatype.A, '192.0.2.1')) | |
47 | self.assertEqual(receivedResponse.additional[2], dns.rrset.from_text("doh.powerdns.com.", 60, dns.rdataclass.IN, dns.rdatatype.AAAA, '2001:db8::2')) | |
48 | self.assertEqual(receivedResponse.additional[3], dns.rrset.from_text("dot.powerdns.com.", 60, dns.rdataclass.IN, dns.rdatatype.AAAA, '2001:db8::1')) | |
49 | ||
50 | def testNoHints(self): | |
51 | """ | |
52 | SVCB: No hints | |
53 | """ | |
54 | name = 'no-hints.svcb.tests.powerdns.com.' | |
55 | query = dns.message.make_query(name, 64, 'IN') | |
56 | # dnsdist set RA = RD for spoofed responses | |
57 | query.flags &= ~dns.flags.RD | |
58 | ||
59 | for method in ("sendUDPQuery", "sendTCPQuery"): | |
60 | sender = getattr(self, method) | |
61 | (_, receivedResponse) = sender(query, response=None, useQueue=False) | |
62 | self.assertTrue(receivedResponse) | |
63 | self.assertEqual(len(receivedResponse.answer), 1) | |
64 | self.assertEqual(receivedResponse.answer[0].rdtype, 64) | |
65 | self.assertEqual(len(receivedResponse.additional), 0) | |
66 | ||
67 | def testEffectiveTarget(self): | |
68 | """ | |
69 | SVCB: Effective target | |
70 | """ | |
71 | name = 'effective-target.svcb.tests.powerdns.com.' | |
72 | query = dns.message.make_query(name, 64, 'IN') | |
73 | # dnsdist set RA = RD for spoofed responses | |
74 | query.flags &= ~dns.flags.RD | |
75 | ||
76 | for method in ("sendUDPQuery", "sendTCPQuery"): | |
77 | sender = getattr(self, method) | |
78 | (_, receivedResponse) = sender(query, response=None, useQueue=False) | |
79 | self.assertTrue(receivedResponse) | |
80 | self.assertEqual(len(receivedResponse.answer), 1) | |
81 | self.assertEqual(receivedResponse.answer[0].rdtype, 64) | |
82 | self.assertEqual(len(receivedResponse.additional), 2) | |
83 | self.assertEqual(receivedResponse.additional[0], dns.rrset.from_text(name, 60, dns.rdataclass.IN, dns.rdatatype.A, '192.0.2.1')) | |
84 | self.assertEqual(receivedResponse.additional[1], dns.rrset.from_text(name, 60, dns.rdataclass.IN, dns.rdatatype.AAAA, '2001:db8::1')) | |
85 | ||
86 | def testHTTPS(self): | |
87 | """ | |
88 | SVCB: HTTPS | |
89 | """ | |
90 | name = 'https.svcb.tests.powerdns.com.' | |
91 | query = dns.message.make_query(name, 65, 'IN') | |
92 | # dnsdist set RA = RD for spoofed responses | |
93 | query.flags &= ~dns.flags.RD | |
94 | ||
95 | for method in ("sendUDPQuery", "sendTCPQuery"): | |
96 | sender = getattr(self, method) | |
97 | (_, receivedResponse) = sender(query, response=None, useQueue=False) | |
98 | self.assertTrue(receivedResponse) | |
99 | self.assertEqual(len(receivedResponse.answer), 1) | |
100 | self.assertEqual(receivedResponse.answer[0].rdtype, 65) | |
101 | self.assertEqual(len(receivedResponse.additional), 2) | |
102 | self.assertEqual(receivedResponse.additional[0], dns.rrset.from_text(name, 60, dns.rdataclass.IN, dns.rdatatype.A, '192.0.2.2')) | |
103 | self.assertEqual(receivedResponse.additional[1], dns.rrset.from_text(name, 60, dns.rdataclass.IN, dns.rdatatype.AAAA, '2001:db8::2')) |