+import asyncio
import time
from pysnmp.hlapi import *
+from pysnmp.hlapi.v3arch.asyncio import *
from recursortests import RecursorTest
# check memory usage > 0
self.assertGreater(results["1.3.6.1.4.1.43315.2.1.76.0"], 0)
- def _getSNMPStats(self, auth):
+ async def _getSNMPStats(self, auth):
results = {}
- for errorIndication, errorStatus, errorIndex, varBinds in nextCmd(
+ snmpEngine = SnmpEngine()
+ iterator = walk_cmd(
SnmpEngine(),
auth,
- UdpTransportTarget((self._snmpServer, self._snmpPort), timeout=self._snmpTimeout),
+ await UdpTransportTarget.create((self._snmpServer, self._snmpPort), timeout=self._snmpTimeout),
ContextData(),
ObjectType(ObjectIdentity(self._snmpOID)),
lookupMib=False,
- ):
- self.assertFalse(errorIndication)
- self.assertFalse(errorStatus)
- self.assertTrue(varBinds)
- for key, value in varBinds:
- keystr = key.prettyPrint()
- if not keystr.startswith(self._snmpOID):
- continue
- results[keystr] = value
+ )
+
+ list = [item async for item in iterator]
+
+ for errorIndication, errorStatus, errorIndex, varBinds in list:
+ self.assertFalse(errorIndication)
+ self.assertFalse(errorStatus)
+ self.assertTrue(varBinds)
+ for key, value in varBinds:
+ keystr = key.prettyPrint()
+ if not keystr.startswith(self._snmpOID):
+ continue
+ results[keystr] = value
+ snmpEngine.close_dispatcher()
return results
def _checkStats(self, auth):
# wait 1s so that the uptime is > 0
time.sleep(1)
- results = self._getSNMPStats(auth)
+ results = asyncio.run(self._getSNMPStats(auth))
self._checkStatsValues(results)
def testSNMPv2Stats(self):