]>
git.ipfire.org Git - thirdparty/pdns.git/blob - regression-tests.dnsdist/test_SNMP.py
5 from pysnmp
.hlapi
import *
6 from dnsdisttests
import DNSDistTest
8 class TestSNMP(DNSDistTest
):
9 # wait 1s so that the uptime is > 0
10 _extraStartupSleep
= 1
12 _snmpServer
= '127.0.0.1'
14 _snmpV2Community
= 'secretcommunity'
15 _snmpV3User
= 'secretuser'
16 _snmpV3AuthKey
= 'mysecretauthkey'
17 _snmpV3EncKey
= 'mysecretenckey'
18 _snmpOID
= '1.3.6.1.4.1.43315.3'
20 _config_template
= """
21 newServer{address="127.0.0.1:%s", name="servername"}
23 setVerboseHealthChecks(true)
27 def _checkStatsValues(self
, results
, queriesCountersValue
):
28 for i
in list(range(1, 5)) + list(range(6, 20)) + list(range(24, 35)) + [ 35 ] :
29 oid
= self
._snmpOID
+ '.1.' + str(i
) + '.0'
30 self
.assertTrue(oid
in results
)
31 self
.assertTrue(isinstance(results
[oid
], Counter64
))
33 for i
in range(20, 23):
34 oid
= self
._snmpOID
+ '.1.' + str(i
) + '.0'
35 self
.assertTrue(isinstance(results
[oid
], OctetString
))
38 self
.assertGreater(results
['1.3.6.1.4.1.43315.3.1.24.0'], 0)
39 # check memory usage > 0
40 self
.assertGreater(results
['1.3.6.1.4.1.43315.3.1.25.0'], 0)
42 # check that the queries, responses and rdQueries counters are now at queriesCountersValue
44 oid
= self
._snmpOID
+ '.1.' + str(i
) + '.0'
45 self
.assertEqual(results
[oid
], queriesCountersValue
)
47 # the others counters (except for latency ones) should still be at 0
48 for i
in [3, 4, 6, 7, 8, 9, 10, 11, 12, 13, 26, 27, 29, 30, 31, 35, 36]:
49 oid
= self
._snmpOID
+ '.1.' + str(i
) + '.0'
50 self
.assertEqual(results
[oid
], 0)
52 # check the backend stats
56 for i
in [3, 4, 5, 6, 7, 11, 12, 13]:
57 oid
= self
._snmpOID
+ '.2.1.' + str(i
) + '.0'
58 self
.assertTrue(isinstance(results
[oid
], Counter64
))
59 for i
in [2, 8, 9, 10]:
60 oid
= self
._snmpOID
+ '.2.1.' + str(i
) + '.0'
61 self
.assertTrue(isinstance(results
[oid
], OctetString
))
64 self
.assertEqual(str(results
['1.3.6.1.4.1.43315.3.2.1.2.0']), "servername")
66 self
.assertEqual(results
['1.3.6.1.4.1.43315.3.2.1.4.0'], 1)
68 self
.assertEqual(results
['1.3.6.1.4.1.43315.3.2.1.5.0'], 0)
70 self
.assertEqual(results
['1.3.6.1.4.1.43315.3.2.1.6.0'], 0)
72 self
.assertEqual(results
['1.3.6.1.4.1.43315.3.2.1.7.0'], 0)
74 self
.assertEqual(str(results
['1.3.6.1.4.1.43315.3.2.1.8.0']), "up")
76 self
.assertEqual(str(results
['1.3.6.1.4.1.43315.3.2.1.9.0']), ("127.0.0.1:%s" % (self
._testServerPort
)))
78 self
.assertEqual(str(results
['1.3.6.1.4.1.43315.3.2.1.10.0']), "")
80 self
.assertEqual(results
['1.3.6.1.4.1.43315.3.2.1.12.0'], queriesCountersValue
)
82 self
.assertEqual(results
['1.3.6.1.4.1.43315.3.2.1.13.0'], 1)
84 def _getSNMPStats(self
, auth
):
86 for (errorIndication
, errorStatus
, errorIndex
, varBinds
) in nextCmd(SnmpEngine(),
88 UdpTransportTarget((self
._snmpServer
, self
._snmpPort
), timeout
=self
._snmpTimeout
),
90 ObjectType(ObjectIdentity(self
._snmpOID
)),
92 self
.assertFalse(errorIndication
)
93 self
.assertFalse(errorStatus
)
94 self
.assertTrue(varBinds
)
95 for key
, value
in varBinds
:
96 keystr
= key
.prettyPrint()
97 if not keystr
.startswith(self
._snmpOID
):
99 results
[keystr
] = value
103 def _checkStats(self
, auth
, name
):
105 results
= self
._getSNMPStats
(auth
)
106 self
._checkStatsValues
(results
, self
.__class
__._queriesSent
)
108 query
= dns
.message
.make_query(name
, 'A', 'IN', use_edns
=False)
109 response
= dns
.message
.make_response(query
)
110 rrset
= dns
.rrset
.from_text(name
,
115 response
.answer
.append(rrset
)
118 (receivedQuery
, receivedResponse
) = self
.sendUDPQuery(query
, response
)
119 self
.assertTrue(receivedQuery
)
120 self
.assertTrue(receivedResponse
)
121 receivedQuery
.id = query
.id
122 self
.assertEqual(query
, receivedQuery
)
123 self
.assertEqual(response
, receivedResponse
)
124 self
.__class
__._queriesSent
= self
.__class
__._queriesSent
+ 1
126 results
= self
._getSNMPStats
(auth
)
127 self
._checkStatsValues
(results
, self
.__class
__._queriesSent
)
129 def testSNMPv2Stats(self
):
131 SNMP: Retrieve statistics via SNMPv2c
134 auth
= CommunityData(self
._snmpV
2Community
, mpModel
=1)
135 name
= 'simplea.snmpv2c.tests.powerdns.com.'
136 self
._checkStats
(auth
, name
)
138 def testSNMPv3Stats(self
):
140 SNMP: Retrieve statistics via SNMPv3
143 auth
= UsmUserData(self
._snmpV
3User
,
144 authKey
=self
._snmpV
3AuthKey
,
145 privKey
=self
._snmpV
3EncKey
,
146 authProtocol
=usmHMACSHAAuthProtocol
,
147 privProtocol
=usmAesCfb128Protocol
)
148 name
= 'simplea.snmpv2.tests.powerdns.com.'
149 self
._checkStats
(auth
, name
)