]> git.ipfire.org Git - thirdparty/pdns.git/blame - regression-tests.recursor-dnssec/test_SNMP.py
Merge pull request #13927 from rgacogne/fix-fclose-warnings
[thirdparty/pdns.git] / regression-tests.recursor-dnssec / test_SNMP.py
CommitLineData
aa7a54c9
RG
1import time
2
3from pysnmp.hlapi import *
4
5from recursortests import RecursorTest
6
7class TestSNMP(RecursorTest):
8
9 _snmpTimeout = 2.0
10 _snmpServer = '127.0.0.1'
11 _snmpPort = 161
12 _snmpV2Community = 'secretcommunity'
13 _snmpV3User = 'secretuser'
14 _snmpV3AuthKey = 'mysecretauthkey'
15 _snmpV3EncKey = 'mysecretenckey'
16 _snmpOID = '1.3.6.1.4.1.43315.2'
17 _queriesSent = 0
18 _confdir = 'SNMP'
19 _config_template = """
20 snmp-agent=yes
21 """
22
23 def _checkStatsValues(self, results):
e59cfbbe
OM
24 count = 148
25 for i in list(range(1, count)):
aa7a54c9
RG
26 oid = self._snmpOID + '.1.' + str(i) + '.0'
27 self.assertTrue(oid in results)
28 self.assertTrue(isinstance(results[oid], Counter64))
29
e59cfbbe
OM
30 oid = self._snmpOID + '.1.' + str(count + 1) + '.0'
31 self.assertFalse(oid in results)
32
aa7a54c9
RG
33 # check uptime > 0
34 self.assertGreater(results['1.3.6.1.4.1.43315.2.1.75.0'], 0)
35 # check memory usage > 0
36 self.assertGreater(results['1.3.6.1.4.1.43315.2.1.76.0'], 0)
37
38 def _getSNMPStats(self, auth):
39 results = {}
40 for (errorIndication, errorStatus, errorIndex, varBinds) in nextCmd(SnmpEngine(),
41 auth,
42 UdpTransportTarget((self._snmpServer, self._snmpPort), timeout=self._snmpTimeout),
43 ContextData(),
44 ObjectType(ObjectIdentity(self._snmpOID)),
45 lookupMib=False):
46 self.assertFalse(errorIndication)
47 self.assertFalse(errorStatus)
48 self.assertTrue(varBinds)
49 for key, value in varBinds:
50 keystr = key.prettyPrint()
51 if not keystr.startswith(self._snmpOID):
52 continue
53 results[keystr] = value
54
55 return results
56
57 def _checkStats(self, auth):
58 # wait 1s so that the uptime is > 0
59 time.sleep(1)
60
61 results = self._getSNMPStats(auth)
62 self._checkStatsValues(results)
63
64 def testSNMPv2Stats(self):
65 """
66 SNMP: Retrieve statistics via SNMPv2c
67 """
68
69 auth = CommunityData(self._snmpV2Community, mpModel=1)
70 self._checkStats(auth)
71
72 def testSNMPv3Stats(self):
73 """
74 SNMP: Retrieve statistics via SNMPv3
75 """
76
77 auth = UsmUserData(self._snmpV3User,
78 authKey=self._snmpV3AuthKey,
79 privKey=self._snmpV3EncKey,
80 authProtocol=usmHMACSHAAuthProtocol,
81 privProtocol=usmAesCfb128Protocol)
82 self._checkStats(auth)