]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.recursor-dnssec/test_Notify.py
1 import clientsubnetoption
8 from recursortests
import RecursorTest
10 class NotifyRecursorTest(RecursorTest
):
20 _wsPassword
= 'secretpassword'
21 _apiKey
= 'secretapikey'
22 _config_template
= """
23 disable-packetcache=yes
24 auth-zones=example=configs/%s/example.zone
25 allow-notify-from=127.0.0.1
26 allow-notify-for=example
31 webserver-address=127.0.0.1
34 """ % (_confdir
, _wsPort
, _wsPassword
, _apiKey
)
37 def generateRecursorConfig(cls
, confdir
):
38 authzonepath
= os
.path
.join(confdir
, 'example.zone')
39 with
open(authzonepath
, 'w') as authzone
:
40 authzone
.write("""$ORIGIN example.
42 a 3600 IN A 192.0.2.42
43 b 3600 IN A 192.0.2.42
44 c 3600 IN A 192.0.2.42
45 d 3600 IN A 192.0.2.42
46 e 3600 IN A 192.0.2.42
47 f 3600 IN CNAME f ; CNAME loop: dirty trick to get a ServFail in an authzone
48 """.format(soa
=cls
._SOA
))
49 super(NotifyRecursorTest
, cls
).generateRecursorConfig(confdir
)
51 def checkRecordCacheMetrics(self
, expectedHits
, expectedMisses
):
52 headers
= {'x-api-key': self
._apiKey
}
53 url
= 'http://127.0.0.1:' + str(self
._wsPort
) + '/api/v1/servers/localhost/statistics'
54 r
= requests
.get(url
, headers
=headers
, timeout
=self
._wsTimeout
)
56 self
.assertEqual(r
.status_code
, 200)
57 self
.assertTrue(r
.json())
62 if entry
['name'] == 'cache-hits':
64 self
.assertEqual(int(entry
['value']), expectedHits
)
65 elif entry
['name'] == 'cache-misses':
67 self
.assertEqual(int(entry
['value']), expectedMisses
)
69 self
.assertTrue(foundHits
)
70 self
.assertTrue(foundMisses
)
73 self
.waitForTCPSocket("127.0.0.1", self
._wsPort
)
76 query
= dns
.message
.make_query(qname
, 'A', want_dnssec
=True)
77 expected
= dns
.rrset
.from_text(qname
, 0, dns
.rdataclass
.IN
, 'A', '192.0.2.42')
79 for method
in ("sendUDPQuery", "sendTCPQuery"):
80 sender
= getattr(self
, method
)
82 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
83 self
.assertRRsetInAnswer(res
, expected
)
85 self
.checkRecordCacheMetrics(1, 1)
87 # we should get a hit over UDP this time
88 res
= self
.sendUDPQuery(query
)
89 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
90 self
.assertRRsetInAnswer(res
, expected
)
91 self
.checkRecordCacheMetrics(2, 1)
93 # we should get a hit over TCP this time
94 res
= self
.sendTCPQuery(query
)
95 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
96 self
.assertRRsetInAnswer(res
, expected
)
97 self
.checkRecordCacheMetrics(3, 1)
99 notify
= dns
.message
.make_query('example', 'SOA', want_dnssec
=False)
100 notify
.set_opcode(4) # notify
101 notifyexpected
= dns
.rrset
.from_text('example.', 0, dns
.rdataclass
.IN
, 'SOA')
102 for method
in ("sendUDPQuery", "sendTCPQuery"):
103 sender
= getattr(self
, method
)
105 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
106 self
.assertEquals(res
.opcode(), 4)
108 self
.assertEquals(res
.question
[0].to_text(), 'example. IN SOA')
110 self
.checkRecordCacheMetrics(3, 1)
112 for method
in ("sendUDPQuery", "sendTCPQuery"):
113 sender
= getattr(self
, method
)
115 self
.assertRcodeEqual(res
, dns
.rcode
.NOERROR
)
116 self
.assertRRsetInAnswer(res
, expected
)
118 self
.checkRecordCacheMetrics(4, 2)