]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.dnsdist/test_SNMP.py
Merge pull request #8795 from omoerbeek/rec-lua-docs-policytag
[thirdparty/pdns.git] / regression-tests.dnsdist / test_SNMP.py
CommitLineData
9f4eb5cc
RG
1#!/usr/bin/env python
2import dns
3import time
4
5from pysnmp.hlapi import *
6from dnsdisttests import DNSDistTest
7
8class TestSNMP(DNSDistTest):
9
10 _snmpTimeout = 2.0
11 _snmpServer = '127.0.0.1'
12 _snmpPort = 161
13 _snmpV2Community = 'secretcommunity'
14 _snmpV3User = 'secretuser'
15 _snmpV3AuthKey = 'mysecretauthkey'
16 _snmpV3EncKey = 'mysecretenckey'
17 _snmpOID = '1.3.6.1.4.1.43315.3'
18 _queriesSent = 0
19 _config_template = """
20 newServer{address="127.0.0.1:%s", name="servername"}
21 snmpAgent(true)
22 """
23
24 def _checkStatsValues(self, results, queriesCountersValue):
b4f23783 25 for i in list(range(1, 5)) + list(range(6, 20)) + list(range(24, 35)) + [ 35 ] :
9f4eb5cc
RG
26 oid = self._snmpOID + '.1.' + str(i) + '.0'
27 self.assertTrue(oid in results)
28 self.assertTrue(isinstance(results[oid], Counter64))
29
30 for i in range(20, 23):
31 oid = self._snmpOID + '.1.' + str(i) + '.0'
32 self.assertTrue(isinstance(results[oid], OctetString))
33
34 # check uptime > 0
35 self.assertGreater(results['1.3.6.1.4.1.43315.3.1.24.0'], 0)
36 # check memory usage > 0
37 self.assertGreater(results['1.3.6.1.4.1.43315.3.1.25.0'], 0)
38
39 # check that the queries, responses and rdQueries counters are now at queriesCountersValue
40 for i in [1, 2, 28]:
41 oid = self._snmpOID + '.1.' + str(i) + '.0'
42 self.assertEquals(results[oid], queriesCountersValue)
43
44 # the others counters (except for latency ones) should still be at 0
b4f23783 45 for i in [3, 4, 6, 7, 8, 9, 10, 11, 12, 13, 26, 27, 29, 30, 31, 35, 36]:
9f4eb5cc
RG
46 oid = self._snmpOID + '.1.' + str(i) + '.0'
47 self.assertEquals(results[oid], 0)
48
49 # check the backend stats
50 print(results)
51
52 ## types
53 for i in [3, 4, 5, 6, 7, 11, 12, 13]:
54 oid = self._snmpOID + '.2.1.' + str(i) + '.0'
55 self.assertTrue(isinstance(results[oid], Counter64))
56 for i in [2, 8, 9, 10]:
57 oid = self._snmpOID + '.2.1.' + str(i) + '.0'
58 self.assertTrue(isinstance(results[oid], OctetString))
59
60 ## name
78603024 61 self.assertEquals(str(results['1.3.6.1.4.1.43315.3.2.1.2.0']), "servername")
9f4eb5cc
RG
62 ## weight
63 self.assertEquals(results['1.3.6.1.4.1.43315.3.2.1.4.0'], 1)
64 ## outstanding
65 self.assertEquals(results['1.3.6.1.4.1.43315.3.2.1.5.0'], 0)
66 ## qpslimit
67 self.assertEquals(results['1.3.6.1.4.1.43315.3.2.1.6.0'], 0)
68 ## reused
69 self.assertEquals(results['1.3.6.1.4.1.43315.3.2.1.7.0'], 0)
70 ## state
78603024 71 self.assertEquals(str(results['1.3.6.1.4.1.43315.3.2.1.8.0']), "up")
9f4eb5cc 72 ## address
78603024 73 self.assertEquals(str(results['1.3.6.1.4.1.43315.3.2.1.9.0']), ("127.0.0.1:%s" % (self._testServerPort)))
9f4eb5cc 74 ## pools
78603024 75 self.assertEquals(str(results['1.3.6.1.4.1.43315.3.2.1.10.0']), "")
9f4eb5cc
RG
76 ## queries
77 self.assertEquals(results['1.3.6.1.4.1.43315.3.2.1.12.0'], queriesCountersValue)
78 ## order
79 self.assertEquals(results['1.3.6.1.4.1.43315.3.2.1.13.0'], 1)
80
81 def _getSNMPStats(self, auth):
82 results = {}
83 for (errorIndication, errorStatus, errorIndex, varBinds) in nextCmd(SnmpEngine(),
84 auth,
85 UdpTransportTarget((self._snmpServer, self._snmpPort), timeout=self._snmpTimeout),
86 ContextData(),
87 ObjectType(ObjectIdentity(self._snmpOID)),
88 lookupMib=False):
89 self.assertFalse(errorIndication)
90 self.assertFalse(errorStatus)
91 self.assertTrue(varBinds)
92 for key, value in varBinds:
93 keystr = key.prettyPrint()
94 if not keystr.startswith(self._snmpOID):
95 continue
96 results[keystr] = value
97
98 return results
99
100 def _checkStats(self, auth, name):
101 # wait 1s so that the uptime is > 0
102 time.sleep(1)
103
104 results = self._getSNMPStats(auth)
105 self._checkStatsValues(results, self.__class__._queriesSent)
106
107 query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
108 response = dns.message.make_response(query)
109 rrset = dns.rrset.from_text(name,
110 3600,
111 dns.rdataclass.IN,
112 dns.rdatatype.A,
113 '127.0.0.1')
114 response.answer.append(rrset)
115
116 # send a query
117 (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
118 self.assertTrue(receivedQuery)
119 self.assertTrue(receivedResponse)
120 receivedQuery.id = query.id
121 self.assertEquals(query, receivedQuery)
122 self.assertEquals(response, receivedResponse)
123 self.__class__._queriesSent = self.__class__._queriesSent + 1
124
125 results = self._getSNMPStats(auth)
126 self._checkStatsValues(results, self.__class__._queriesSent)
127
128 def testSNMPv2Stats(self):
129 """
130 SNMP: Retrieve statistics via SNMPv2c
131 """
132
133 auth = CommunityData(self._snmpV2Community, mpModel=1)
134 name = 'simplea.snmpv2c.tests.powerdns.com.'
135 self._checkStats(auth, name)
136
137 def testSNMPv3Stats(self):
138 """
139 SNMP: Retrieve statistics via SNMPv3
140 """
141
142 auth = UsmUserData(self._snmpV3User,
143 authKey=self._snmpV3AuthKey,
144 privKey=self._snmpV3EncKey,
145 authProtocol=usmHMACSHAAuthProtocol,
146 privProtocol=usmAesCfb128Protocol)
147 name = 'simplea.snmpv2.tests.powerdns.com.'
148 self._checkStats(auth, name)