]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.recursor-dnssec/test_Notify.py
Merge pull request #13014 from jsoref/deprecated
[thirdparty/pdns.git] / regression-tests.recursor-dnssec / test_Notify.py
CommitLineData
8662d268
O
1import clientsubnetoption
2import cookiesoption
3import dns
4import os
5import requests
6import subprocess
7
8from recursortests import RecursorTest
9
10class NotifyRecursorTest(RecursorTest):
11
12 _auth_zones = {
13 '8': {'threads': 1,
14 'zones': ['ROOT']}
15 }
16
17 _confdir = 'Notify'
18 _wsPort = 8042
19 _wsTimeout = 2
20 _wsPassword = 'secretpassword'
21 _apiKey = 'secretapikey'
22 _config_template = """
23 disable-packetcache=yes
24 auth-zones=example=configs/%s/example.zone
25 allow-notify-from=127.0.0.1
26 allow-notify-for=example
27 quiet=no
28 loglevel=9
29 webserver=yes
30 webserver-port=%d
31 webserver-address=127.0.0.1
32 webserver-password=%s
33 api-key=%s
34 """ % (_confdir, _wsPort, _wsPassword, _apiKey)
35
36 @classmethod
37 def generateRecursorConfig(cls, confdir):
38 authzonepath = os.path.join(confdir, 'example.zone')
39 with open(authzonepath, 'w') as authzone:
40 authzone.write("""$ORIGIN example.
41@ 3600 IN SOA {soa}
42a 3600 IN A 192.0.2.42
43b 3600 IN A 192.0.2.42
44c 3600 IN A 192.0.2.42
45d 3600 IN A 192.0.2.42
46e 3600 IN A 192.0.2.42
47f 3600 IN CNAME f ; CNAME loop: dirty trick to get a ServFail in an authzone
48""".format(soa=cls._SOA))
49 super(NotifyRecursorTest, cls).generateRecursorConfig(confdir)
50
51 def checkRecordCacheMetrics(self, expectedHits, expectedMisses):
52 headers = {'x-api-key': self._apiKey}
53 url = 'http://127.0.0.1:' + str(self._wsPort) + '/api/v1/servers/localhost/statistics'
54 r = requests.get(url, headers=headers, timeout=self._wsTimeout)
55 self.assertTrue(r)
56 self.assertEqual(r.status_code, 200)
57 self.assertTrue(r.json())
58 content = r.json()
59 foundHits = False
60 foundMisses = True
61 for entry in content:
62 if entry['name'] == 'cache-hits':
63 foundHits = True
64 self.assertEqual(int(entry['value']), expectedHits)
65 elif entry['name'] == 'cache-misses':
66 foundMisses = True
67 self.assertEqual(int(entry['value']), expectedMisses)
68
69 self.assertTrue(foundHits)
70 self.assertTrue(foundMisses)
71
72 def testNotify(self):
741d9e2d 73 self.waitForTCPSocket("127.0.0.1", self._wsPort)
8662d268
O
74 # first query
75 qname = 'a.example.'
76 query = dns.message.make_query(qname, 'A', want_dnssec=True)
77 expected = dns.rrset.from_text(qname, 0, dns.rdataclass.IN, 'A', '192.0.2.42')
78
79 for method in ("sendUDPQuery", "sendTCPQuery"):
80 sender = getattr(self, method)
81 res = sender(query)
82 self.assertRcodeEqual(res, dns.rcode.NOERROR)
83 self.assertRRsetInAnswer(res, expected)
84
85 self.checkRecordCacheMetrics(1, 1)
86
87 # we should get a hit over UDP this time
88 res = self.sendUDPQuery(query)
89 self.assertRcodeEqual(res, dns.rcode.NOERROR)
90 self.assertRRsetInAnswer(res, expected)
91 self.checkRecordCacheMetrics(2, 1)
92
93 # we should get a hit over TCP this time
94 res = self.sendTCPQuery(query)
95 self.assertRcodeEqual(res, dns.rcode.NOERROR)
96 self.assertRRsetInAnswer(res, expected)
97 self.checkRecordCacheMetrics(3, 1)
98
99 notify = dns.message.make_query('example', 'SOA', want_dnssec=False)
100 notify.set_opcode(4) # notify
101 notifyexpected = dns.rrset.from_text('example.', 0, dns.rdataclass.IN, 'SOA')
102 for method in ("sendUDPQuery", "sendTCPQuery"):
103 sender = getattr(self, method)
104 res = sender(notify)
105 self.assertRcodeEqual(res, dns.rcode.NOERROR)
a4d2d116 106 self.assertEqual(res.opcode(), 4)
8662d268 107 print(res)
a4d2d116 108 self.assertEqual(res.question[0].to_text(), 'example. IN SOA')
8662d268 109
0fd8cf21 110 self.checkRecordCacheMetrics(3, 1)
8662d268
O
111
112 for method in ("sendUDPQuery", "sendTCPQuery"):
113 sender = getattr(self, method)
114 res = sender(query)
115 self.assertRcodeEqual(res, dns.rcode.NOERROR)
116 self.assertRRsetInAnswer(res, expected)
117
0fd8cf21 118 self.checkRecordCacheMetrics(4, 2)