diff --git a/pysnmp/entity/rfc3413/cmdgen.py b/pysnmp/entity/rfc3413/cmdgen.py index 3018c5a4..6638efcf 100644 --- a/pysnmp/entity/rfc3413/cmdgen.py +++ b/pysnmp/entity/rfc3413/cmdgen.py @@ -12,7 +12,7 @@ from pysnmp import debug, error, nextid from pysnmp.entity.engine import SnmpEngine from pysnmp.entity.rfc3413 import config -from pysnmp.proto import errind, rfc1905 +from pysnmp.proto import errind from pysnmp.proto.api import v2c from pysnmp.proto.error import StatusInformation from pysnmp.proto.proxy import rfc2576 @@ -568,16 +568,6 @@ def processResponseVarBinds( errorIndication, varBinds = None, () elif not varBindTable: errorIndication, varBinds = errind.emptyResponse, () - else: - errorIndication, varBinds = v2c.apiBulkPDU.getNextVarBinds( - varBindTable[-1], v2c.apiPDU.getVarBinds(reqPDU) - ) - nonRepeaters = v2c.apiBulkPDU.getNonRepeaters(reqPDU) - if nonRepeaters: - varBinds = ( - v2c.apiBulkPDU.getVarBinds(reqPDU)[: int(nonRepeaters)] - + varBinds[int(nonRepeaters) :] - ) if not cbFun( snmpEngine, @@ -596,43 +586,3 @@ def processResponseVarBinds( if not varBinds: return # no more objects available - - v2c.apiBulkPDU.setRequestID(reqPDU, v2c.getNextRequestID()) - v2c.apiBulkPDU.setVarBinds(reqPDU, varBinds) - - try: - self.sendPdu( - snmpEngine, - targetName, - contextEngineId, - contextName, - reqPDU, - self.processResponseVarBinds, - ( - targetName, - nonRepeaters, - maxRepetitions, - contextEngineId, - contextName, - reqPDU, - cbFun, - cbCtx, - ), - ) - - except StatusInformation: - statusInformation = sys.exc_info()[1] - debug.logger & debug.FLAG_APP and debug.logger( - "processResponseVarBinds: sendPduHandle {}: _sendPdu() failed with {!r}".format( - sendRequestHandle, statusInformation - ) - ) - cbFun( - snmpEngine, - sendRequestHandle, - statusInformation["errorIndication"], - 0, - 0, - (), - cbCtx, - ) # type: ignore diff --git a/pysnmp/hlapi/v1arch/asyncio/cmdgen.py b/pysnmp/hlapi/v1arch/asyncio/cmdgen.py index e9d79778..f04d7e0f 100644 --- a/pysnmp/hlapi/v1arch/asyncio/cmdgen.py +++ b/pysnmp/hlapi/v1arch/asyncio/cmdgen.py @@ -5,7 +5,7 @@ # License: https://www.pysnmp.com/pysnmp/license.html # from pysnmp.hlapi.v1arch.auth import * -from pysnmp.hlapi.varbinds import * +from pysnmp.hlapi import varbinds from pysnmp.hlapi.v1arch.asyncio.transport import * from pysnmp.smi.rfc1902 import * from pysnmp.proto import api @@ -14,9 +14,8 @@ __all__ = ["getCmd", "nextCmd", "setCmd", "bulkCmd", "isEndOfMib"] -VB_PROCESSOR = CommandGeneratorVarBinds() - -isEndOfMib = lambda varBinds: not api.v2c.apiPDU.getNextVarBinds(varBinds)[1] +VB_PROCESSOR = varbinds.CommandGeneratorVarBinds() +isEndOfMib = varbinds.isEndOfMib async def getCmd(snmpDispatcher, authData, transportTarget, *varBinds, **options): diff --git a/pysnmp/hlapi/v3arch/asyncio/cmdgen.py b/pysnmp/hlapi/v3arch/asyncio/cmdgen.py index 8069038a..e72a592c 100644 --- a/pysnmp/hlapi/v3arch/asyncio/cmdgen.py +++ b/pysnmp/hlapi/v3arch/asyncio/cmdgen.py @@ -9,6 +9,8 @@ # Zachary Lorusso # Modified by Ilya Etingof # +# Copyright (C) 2024, LeXtudio Inc. +# # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # @@ -32,7 +34,6 @@ # THE POSSIBILITY OF SUCH DAMAGE. # import asyncio -import sys from typing import AsyncGenerator from pysnmp.entity.engine import SnmpEngine @@ -41,7 +42,7 @@ from pysnmp.hlapi.v3arch.asyncio.context import ContextData from pysnmp.hlapi.v3arch.asyncio.lcd import CommandGeneratorLcdConfigurator from pysnmp.hlapi.v3arch.asyncio.transport import AbstractTransportTarget -from pysnmp.hlapi.varbinds import CommandGeneratorVarBinds +from pysnmp.hlapi import varbinds from pysnmp.proto import errind from pysnmp.proto.rfc1902 import Integer32, Null from pysnmp.proto.rfc1905 import EndOfMibView, endOfMibView @@ -53,26 +54,14 @@ "nextCmd", "setCmd", "bulkCmd", - "isEndOfMib", "walkCmd", "bulkWalkCmd", + "isEndOfMib", ] -VB_PROCESSOR = CommandGeneratorVarBinds() +VB_PROCESSOR = varbinds.CommandGeneratorVarBinds() LCD = CommandGeneratorLcdConfigurator() - - -def isEndOfMib(var_binds): # noqa: N816 - """ - Check if the given variable bindings indicate the end of the MIB. - - Parameters: - var_binds (list): A list of variable bindings. - - Returns: - bool: True if it is the end of the MIB, False otherwise. - """ - return not v2c.apiPDU.getNextVarBinds(var_binds)[1] +isEndOfMib = varbinds.isEndOfMib async def getCmd( @@ -82,7 +71,7 @@ async def getCmd( contextData: ContextData, *varBinds, **options -) -> "tuple[errind.ErrorIndication, Integer32 | int, Integer32 | int, tuple[ObjectType]]": +) -> "tuple[errind.ErrorIndication, Integer32 | int, Integer32 | int, list[ObjectType]]": r"""Creates a generator to perform SNMP GET query. When iterator gets advanced by :py:mod:`asyncio` main loop, @@ -203,7 +192,7 @@ async def setCmd( contextData: ContextData, *varBinds, **options -) -> "tuple[errind.ErrorIndication, Integer32 | int, Integer32 | int, tuple[ObjectType]]": +) -> "tuple[errind.ErrorIndication, Integer32 | int, Integer32 | int, list[ObjectType]]": r"""Creates a generator to perform SNMP SET query. When iterator gets advanced by :py:mod:`asyncio` main loop, @@ -324,7 +313,7 @@ async def nextCmd( contextData: ContextData, *varBinds, **options -) -> "tuple[errind.ErrorIndication, Integer32 | int, Integer32 | int, tuple[ObjectType]]": +) -> "tuple[errind.ErrorIndication, Integer32 | str | int, Integer32 | int, list[ObjectType]]": r"""Creates a generator to perform SNMP GETNEXT query. When iterator gets advanced by :py:mod:`asyncio` main loop, @@ -462,9 +451,9 @@ async def bulkCmd( contextData: ContextData, nonRepeaters: int, maxRepetitions: int, - *varBinds, + *varBinds: ObjectType, **options -) -> "tuple[errind.ErrorIndication, Integer32 | int, Integer32 | int, tuple[tuple[ObjectType, ...], ...]]": +) -> "tuple[errind.ErrorIndication, Integer32 | int, Integer32 | int, list[ObjectType]]": r"""Creates a generator to perform SNMP GETBULK query. When iterator gets advanced by :py:mod:`asyncio` main loop, @@ -631,10 +620,10 @@ async def walkCmd( authData: "CommunityData | UsmUserData", transportTarget: AbstractTransportTarget, contextData: ContextData, - *varBinds, + varBind: ObjectType, **options ) -> AsyncGenerator[ - "tuple[errind.ErrorIndication, Integer32 | int, Integer32 | int, tuple[ObjectType]]", + "tuple[errind.ErrorIndication | None, Integer32 | str | int | None, Integer32 | int | None, list[ObjectType]]", None, ]: r"""Creates a generator to perform one or more SNMP GETNEXT queries. @@ -657,8 +646,8 @@ async def walkCmd( contextData : :py:class:`~pysnmp.hlapi.v3arch.asyncio.ContextData` Class instance representing SNMP ContextEngineId and ContextName values. - \*varBinds : :py:class:`~pysnmp.smi.rfc1902.ObjectType` - One or more class instances representing MIB variables to place + varBind : :py:class:`~pysnmp.smi.rfc1902.ObjectType` + One class instance representing MIB variables to place into SNMP request. Other Parameters @@ -734,19 +723,18 @@ async def walkCmd( maxRows = options.get("maxRows", 0) maxCalls = options.get("maxCalls", 0) - initialVars = [x[0] for x in VB_PROCESSOR.makeVarBinds(snmpEngine.cache, varBinds)] + initialVars = [x[0] for x in VB_PROCESSOR.makeVarBinds(snmpEngine.cache, [varBind])] totalRows = totalCalls = 0 while True: - previousVarBinds = varBinds - if varBinds: + if varBind: errorIndication, errorStatus, errorIndex, varBindTable = await nextCmd( snmpEngine, authData, transportTarget, contextData, - *[(x[0], Null("")) for x in varBinds], + *[(varBind[0], Null(""))], **dict(lookupMib=options.get("lookupMib", True)) ) if ( @@ -757,7 +745,7 @@ async def walkCmd( errorIndication = None if errorIndication: - yield (errorIndication, errorStatus, errorIndex, varBinds) + yield (errorIndication, errorStatus, errorIndex, [varBind]) return elif errorStatus: if errorStatus == 2: @@ -765,23 +753,22 @@ async def walkCmd( # from SNMPv1 Agent through internal pysnmp proxy. errorStatus = 0 errorIndex = 0 - yield (errorIndication, errorStatus, errorIndex, varBinds) return else: stopFlag = True - varBinds = varBindTable[0] - - for col, varBind in enumerate(varBinds): - name, val = varBind - if isinstance(val, Null) or isinstance(val, EndOfMibView): - varBinds[col] = previousVarBinds[col][0], endOfMibView + varBind = varBindTable[0][0] - if not lexicographicMode and not initialVars[col].isPrefixOf(name): - varBinds[col] = previousVarBinds[col][0], endOfMibView + name, val = varBind + foundEnding = isinstance(val, Null) or isinstance(val, EndOfMibView) + foundBeyond = not lexicographicMode and not initialVars[0].isPrefixOf( + name + ) + if foundEnding or foundBeyond: + return - if stopFlag and varBinds[col][1] is not endOfMibView: - stopFlag = False + if stopFlag and varBind[1] is not endOfMibView: + stopFlag = False if stopFlag: return @@ -790,14 +777,15 @@ async def walkCmd( totalCalls += 1 else: errorIndication = errorStatus = errorIndex = None - varBinds = [] + varBind = None - initialVarBinds = (yield errorIndication, errorStatus, errorIndex, varBinds) + initialVarBinds = (yield errorIndication, errorStatus, errorIndex, [varBind]) if initialVarBinds: - varBinds = initialVarBinds + varBind = initialVarBinds[0] initialVars = [ - x[0] for x in VB_PROCESSOR.makeVarBinds(snmpEngine.cache, varBinds) + x[0] + for x in VB_PROCESSOR.makeVarBinds(snmpEngine.cache, initialVarBinds) ] if maxRows and totalRows >= maxRows: @@ -812,12 +800,12 @@ async def bulkWalkCmd( authData: "CommunityData | UsmUserData", transportTarget: AbstractTransportTarget, contextData: ContextData, - nonRepeaters, - maxRepetitions, - *varBinds, + nonRepeaters: int, + maxRepetitions: int, + varBind: ObjectType, **options ) -> AsyncGenerator[ - "tuple[errind.ErrorIndication, Integer32 | int, Integer32 | int, tuple[ObjectType]]", + "tuple[errind.ErrorIndication | None, Integer32 | int | None, Integer32 | int | None, list[ObjectType]]", None, ]: r"""Creates a generator to perform one or more SNMP GETBULK queries. @@ -850,8 +838,8 @@ async def bulkWalkCmd( `nonRepeaters`). Remote SNMP engine may choose lesser value than requested. - \*varBinds : :py:class:`~pysnmp.smi.rfc1902.ObjectType` - One or more class instances representing MIB variables to place + varBind : :py:class:`~pysnmp.smi.rfc1902.ObjectType` + One class instance representing MIB variables to place into SNMP request. Other Parameters @@ -932,111 +920,100 @@ async def bulkWalkCmd( maxRows = options.get("maxRows", 0) maxCalls = options.get("maxCalls", 0) - initialVars = [x[0] for x in VB_PROCESSOR.makeVarBinds(snmpEngine.cache, varBinds)] - nullVarBinds = [False] * len(initialVars) + initialVars = [x[0] for x in VB_PROCESSOR.makeVarBinds(snmpEngine.cache, [varBind])] totalRows = totalCalls = 0 - stopFlag = False - while not stopFlag: + varBinds = [varBind] + + while True: if maxRows and totalRows < maxRows: maxRepetitions = min(maxRepetitions, maxRows - totalRows) - previousVarBinds = varBinds + if varBinds: + errorIndication, errorStatus, errorIndex, varBindTable = await bulkCmd( + snmpEngine, + authData, + transportTarget, + contextData, + nonRepeaters, + maxRepetitions, + *[ObjectType(varBinds[-1][0], Null(""))], + **dict(lookupMib=options.get("lookupMib", True)) + ) - errorIndication, errorStatus, errorIndex, varBindTable = await bulkCmd( - snmpEngine, - authData, - transportTarget, - contextData, - nonRepeaters, - maxRepetitions, - *[(x[0], Null("")) for x in varBinds], - **dict(lookupMib=options.get("lookupMib", True)) - ) + if ( + ignoreNonIncreasingOid + and errorIndication + and isinstance(errorIndication, errind.OidNotIncreasing) + ): + errorIndication = None - if ( - ignoreNonIncreasingOid - and errorIndication - and isinstance(errorIndication, errind.OidNotIncreasing) - ): - errorIndication = None - - if errorIndication: - yield ( - errorIndication, - errorStatus, - errorIndex, - varBindTable and varBindTable[0] or [], - ) - if errorIndication != errind.requestTimedOut: + if errorIndication: + yield ( + errorIndication, + errorStatus, + errorIndex, + varBindTable[0] and varBinds, + ) + if errorIndication != errind.requestTimedOut: + return + elif errorStatus: + if errorStatus == 2: + # Hide SNMPv1 noSuchName error which leaks in here + # from SNMPv1 Agent through internal pysnmp proxy. + errorStatus = 0 + errorIndex = 0 + yield ( + errorIndication, + errorStatus, + errorIndex, + varBinds, + ) return - elif errorStatus: - if errorStatus == 2: - # Hide SNMPv1 noSuchName error which leaks in here - # from SNMPv1 Agent through internal pysnmp proxy. - errorStatus = 0 - errorIndex = 0 - yield ( - errorIndication, - errorStatus, - errorIndex, - varBindTable and varBindTable[0] or [], - ) - return - else: - for row in range(len(varBindTable)): + else: stopFlag = True - if len(varBindTable[row]) != len(initialVars): - varBindTable = row and varBindTable[: row - 1] or [] - break - for col in range(len(varBindTable[row])): - name, val = varBindTable[row][col] - if row: - previousVarBinds = varBindTable[row - 1] - if nullVarBinds[col]: - varBindTable[row][col] = previousVarBinds[col][0], endOfMibView - continue - stopFlag = False - if isinstance(val, Null) or isinstance(val, EndOfMibView): - varBindTable[row][col] = previousVarBinds[col][0], endOfMibView - nullVarBinds[col] = True - if not lexicographicMode and not initialVars[col].isPrefixOf(name): - varBindTable[row][col] = previousVarBinds[col][0], endOfMibView - if len(varBindTable) == 1: - stopFlag = True - break - else: - nullVarBinds[col] = True - if stopFlag: - varBindTable = row and varBindTable[: row - 1] or [] - break + varBinds = varBindTable[0] + + for col, varBind in enumerate(varBinds): + name, val = varBind + foundEnding = isinstance(val, Null) or isinstance(val, EndOfMibView) + foundBeyond = not lexicographicMode and not initialVars[ + 0 + ].isPrefixOf(name) + if foundEnding or foundBeyond: + stopFlag = True + result = varBinds[:col] + if len(result) > 0: + yield ( + errorIndication, + errorStatus, + errorIndex, + result, + ) + return - totalRows += len(varBindTable) - totalCalls += 1 + if stopFlag and varBinds[col][1] is not endOfMibView: + stopFlag = False - if maxRows and totalRows >= maxRows: - if totalRows > maxRows: - varBindTable = varBindTable[: -(totalRows - maxRows)] - stopFlag = True + if stopFlag: + return - if maxCalls and totalCalls >= maxCalls: - stopFlag = True + totalRows += 1 + totalCalls += 1 + else: + errorIndication = errorStatus = errorIndex = None + varBinds = [] - varBinds = varBindTable and varBindTable[-1] or [] + initialVarBinds = (yield errorIndication, errorStatus, errorIndex, varBinds) + if initialVarBinds: + varBinds = initialVarBinds + initialVars = [ + x[0] for x in VB_PROCESSOR.makeVarBinds(snmpEngine.cache, varBinds) + ] - for varBindRow in varBindTable: - initialVarBinds = ( - yield errorIndication, - errorStatus, - errorIndex, - varBindRow, - ) + if maxRows and totalRows >= maxRows: + return - if initialVarBinds: - varBinds = initialVarBinds - initialVars = [ - x[0] - for x in VB_PROCESSOR.makeVarBinds(snmpEngine.cache, varBinds) - ] - nullVarBinds = [False] * len(initialVars) + if maxCalls and totalCalls >= maxCalls: + return diff --git a/pysnmp/hlapi/v3arch/asyncio/slim.py b/pysnmp/hlapi/v3arch/asyncio/slim.py index 9b55c0c5..a3c8b3e5 100644 --- a/pysnmp/hlapi/v3arch/asyncio/slim.py +++ b/pysnmp/hlapi/v3arch/asyncio/slim.py @@ -70,7 +70,7 @@ async def get( *varBinds, timeout: int = 1, retries: int = 5, - ) -> "tuple[ErrorIndication, Integer32 | int, Integer32 | int, tuple[ObjectType]]": + ) -> "tuple[ErrorIndication, Integer32 | int, Integer32 | int, list[ObjectType]]": r"""Creates a generator to perform SNMP GET query. When iterator gets advanced by :py:mod:`asyncio` main loop, @@ -157,7 +157,7 @@ async def next( *varBinds, timeout: int = 1, retries: int = 5, - ) -> "tuple[ErrorIndication, Integer32 | int, Integer32 | int, tuple[ObjectType]]": + ) -> "tuple[ErrorIndication, Integer32 | str | int, Integer32 | int, list[ObjectType]]": r"""Creates a generator to perform SNMP GETNEXT query. When iterator gets advanced by :py:mod:`asyncio` main loop, @@ -250,7 +250,7 @@ async def bulk( *varBinds, timeout: int = 1, retries: int = 5, - ) -> "tuple[ErrorIndication, Integer32 | int, Integer32 | int, tuple[tuple[ObjectType, ...], ...]]": + ) -> "tuple[ErrorIndication, Integer32 | str | int, Integer32 | int, list[ObjectType]]": r"""Creates a generator to perform SNMP GETBULK query. When iterator gets advanced by :py:mod:`asyncio` main loop, @@ -373,7 +373,7 @@ async def set( *varBinds, timeout: int = 1, retries: int = 5, - ) -> "tuple[ErrorIndication, Integer32 | int, Integer32 | int, tuple[ObjectType]]": + ) -> "tuple[ErrorIndication, Integer32 | int, Integer32 | int, list[ObjectType]]": r"""Creates a generator to perform SNMP SET query. When iterator gets advanced by :py:mod:`asyncio` main loop, diff --git a/pysnmp/hlapi/varbinds.py b/pysnmp/hlapi/varbinds.py index e61ebe91..c7eb16c1 100644 --- a/pysnmp/hlapi/varbinds.py +++ b/pysnmp/hlapi/varbinds.py @@ -6,10 +6,24 @@ # from typing import Any, Dict +from pysnmp.proto.api import v2c from pysnmp.smi import builder, view from pysnmp.smi.rfc1902 import NotificationType, ObjectIdentity, ObjectType -__all__ = ["CommandGeneratorVarBinds", "NotificationOriginatorVarBinds"] +__all__ = ["CommandGeneratorVarBinds", "NotificationOriginatorVarBinds", "isEndOfMib"] + + +def isEndOfMib(var_binds): # noqa: N816 + """ + Check if the given variable bindings indicate the end of the MIB. + + Parameters: + var_binds (list): A list of variable bindings. + + Returns: + bool: True if it is the end of the MIB, False otherwise. + """ + return not v2c.apiPDU.getNextVarBinds(var_binds)[1] class MibViewControllerManager: diff --git a/pysnmp/proto/api/v2c.py b/pysnmp/proto/api/v2c.py index 79dd52bc..e97d3f0c 100644 --- a/pysnmp/proto/api/v2c.py +++ b/pysnmp/proto/api/v2c.py @@ -76,10 +76,9 @@ def getNextVarBinds(self, varBinds, origVarBinds=None): ): nonNulls -= 1 elif origVarBinds is not None: - if ( - ObjectIdentifier(origVarBinds[idx][0]).asTuple() - >= varBinds[idx][0].asTuple() - ): + seed = ObjectIdentifier(origVarBinds[idx][0]).asTuple() + found = varBinds[idx][0].asTuple() + if seed >= found: errorIndication = errind.oidNotIncreasing rspVarBinds.insert(0, (varBinds[idx][0], null)) @@ -159,34 +158,6 @@ def getMaxRepetitions(pdu): def setMaxRepetitions(pdu, value): pdu.setComponentByPosition(2, value) - def getVarBindTable(self, reqPDU, rspPDU): - nonRepeaters = self.getNonRepeaters(reqPDU) - - reqVarBinds = self.getVarBinds(reqPDU) - - N = min(int(nonRepeaters), len(reqVarBinds)) - - rspVarBinds = self.getVarBinds(rspPDU) - - # shortcut for the most trivial case - if N == 0 and len(reqVarBinds) == 1: - return [[vb] for vb in rspVarBinds] - - R = max(len(reqVarBinds) - N, 0) - - varBindTable = [] - - if R: - for i in range(0, len(rspVarBinds) - N, R): - varBindRow = rspVarBinds[:N] + rspVarBinds[N + i : N + R + i] - # ignore stray OIDs / non-rectangular table - if len(varBindRow) == N + R: - varBindTable.append(varBindRow) - elif N: - varBindTable.append(rspVarBinds[:N]) - - return varBindTable - apiBulkPDU = BulkPDUAPI() # noqa: N816 diff --git a/tests/hlapi/asyncio/manager/cmdgen/test_v1_walk.py b/tests/hlapi/asyncio/manager/cmdgen/test_v1_walk.py index 7a34b299..06ae82f3 100644 --- a/tests/hlapi/asyncio/manager/cmdgen/test_v1_walk.py +++ b/tests/hlapi/asyncio/manager/cmdgen/test_v1_walk.py @@ -31,6 +31,40 @@ async def test_v1_walk(): assert len(varBinds) == 1 assert varBinds[0][0].prettyPrint() == "SNMPv2-MIB::sysUpTime.0" - assert len(objects_list), 50 + assert len(objects_list) == 267 + + snmpEngine.closeDispatcher() + + +@pytest.mark.asyncio +async def test_v1_walk_subtree(): + async with AgentContextManager(): + snmpEngine = SnmpEngine() + objects = walkCmd( + snmpEngine, + CommunityData("public", mpModel=0), + await UdpTransportTarget.create(("localhost", AGENT_PORT)), + ContextData(), + ObjectType(ObjectIdentity("SNMPv2-MIB", "system")), + lexicographicMode=False, + ) + + objects_list = [item async for item in objects] + + errorIndication, errorStatus, errorIndex, varBinds = objects_list[0] + + assert errorIndication is None + assert errorStatus == 0 + assert len(varBinds) == 1 + assert varBinds[0][0].prettyPrint() == "SNMPv2-MIB::sysDescr.0" + + errorIndication, errorStatus, errorIndex, varBinds = objects_list[1] + + assert errorIndication is None + assert errorStatus == 0 + assert len(varBinds) == 1 + assert varBinds[0][0].prettyPrint() == "SNMPv2-MIB::sysObjectID.0" + + assert len(objects_list) == 8 snmpEngine.closeDispatcher() diff --git a/tests/hlapi/asyncio/manager/cmdgen/test_v2_walk.py b/tests/hlapi/asyncio/manager/cmdgen/test_v2_walk.py new file mode 100644 index 00000000..01fa2037 --- /dev/null +++ b/tests/hlapi/asyncio/manager/cmdgen/test_v2_walk.py @@ -0,0 +1,70 @@ +import pytest +from pysnmp.hlapi.v3arch.asyncio import * +from tests.agent_context import AGENT_PORT, AgentContextManager + + +@pytest.mark.asyncio +async def test_v2_walk(): # some agents have different v2 GET NEXT behavior + async with AgentContextManager(): + snmpEngine = SnmpEngine() + objects = walkCmd( + snmpEngine, + CommunityData("public"), + await UdpTransportTarget.create(("localhost", AGENT_PORT)), + ContextData(), + ObjectType(ObjectIdentity("SNMPv2-MIB", "sysDescr", 0)), + ) + + objects_list = [item async for item in objects] + + errorIndication, errorStatus, errorIndex, varBinds = objects_list[0] + + assert errorIndication is None + assert errorStatus == 0 + assert len(varBinds) == 1 + assert varBinds[0][0].prettyPrint() == "SNMPv2-MIB::sysObjectID.0" + + errorIndication, errorStatus, errorIndex, varBinds = objects_list[1] + + assert errorIndication is None + assert errorStatus == 0 + assert len(varBinds) == 1 + assert varBinds[0][0].prettyPrint() == "SNMPv2-MIB::sysUpTime.0" + + assert len(objects_list) == 267 + + snmpEngine.closeDispatcher() + + +@pytest.mark.asyncio +async def test_v2_walk_subtree(): + async with AgentContextManager(): + snmpEngine = SnmpEngine() + objects = walkCmd( + snmpEngine, + CommunityData("public"), + await UdpTransportTarget.create(("localhost", AGENT_PORT)), + ContextData(), + ObjectType(ObjectIdentity("SNMPv2-MIB", "system")), + lexicographicMode=False, + ) + + objects_list = [item async for item in objects] + + errorIndication, errorStatus, errorIndex, varBinds = objects_list[0] + + assert errorIndication is None + assert errorStatus == 0 + assert len(varBinds) == 1 + assert varBinds[0][0].prettyPrint() == "SNMPv2-MIB::sysDescr.0" + + errorIndication, errorStatus, errorIndex, varBinds = objects_list[1] + + assert errorIndication is None + assert errorStatus == 0 + assert len(varBinds) == 1 + assert varBinds[0][0].prettyPrint() == "SNMPv2-MIB::sysObjectID.0" + + assert len(objects_list) == 8 + + snmpEngine.closeDispatcher() diff --git a/tests/hlapi/asyncio/manager/cmdgen/test_v2c_bulk.py b/tests/hlapi/asyncio/manager/cmdgen/test_v2c_bulk.py index 27b8c5d6..a7d4187f 100644 --- a/tests/hlapi/asyncio/manager/cmdgen/test_v2c_bulk.py +++ b/tests/hlapi/asyncio/manager/cmdgen/test_v2c_bulk.py @@ -19,13 +19,36 @@ async def test_v2c_bulk(num_bulk): 0, num_bulk, ObjectType(ObjectIdentity("SNMPv2-MIB", "sysDescr", 0)), + retries=0, ) assert errorIndication is None assert errorStatus == 0 - assert len(varBinds) == num_bulk + assert len(varBinds) == 1 assert varBinds[0][0][0].prettyPrint() == "SNMPv2-MIB::sysObjectID.0" if num_bulk > 1: - assert varBinds[1][0][0].prettyPrint() == "SNMPv2-MIB::sysUpTime.0" + assert varBinds[0][1][0].prettyPrint() == "SNMPv2-MIB::sysUpTime.0" if num_bulk > 2: - assert varBinds[2][0][0].prettyPrint() == "SNMPv2-MIB::sysContact.0" + assert varBinds[0][2][0].prettyPrint() == "SNMPv2-MIB::sysContact.0" + + +@pytest.mark.asyncio +async def test_v2c_bulk_multiple_input(): + mib_objects = [ + ObjectType(ObjectIdentity("SNMPv2-MIB", "sysContact")), + ObjectType(ObjectIdentity("SNMPv2-MIB", "sysORIndex")), + ObjectType(ObjectIdentity("SNMPv2-MIB", "sysORDescr")), + ] + async with AgentContextManager(): + with Slim() as slim: + errorIndication, errorStatus, errorIndex, varBinds = await slim.bulk( + "public", "demo.pysnmp.com", 161, 1, 2, *mib_objects, retries=0 + ) + + assert errorIndication is None + assert errorStatus == 0 + assert len(varBinds) == 1 + assert len(varBinds[0]) == 5 + + +# snmpbulkget -v2c -c public -C n1 -C r2 localhost 1.3.6.1.2.1.1.4 1.3.6.1.2.1.1.9.1.1 1.3.6.1.2.1.1.9.1.3 diff --git a/tests/hlapi/asyncio/manager/cmdgen/test_v2c_bulkwalk.py b/tests/hlapi/asyncio/manager/cmdgen/test_v2c_bulkwalk.py index 2156e091..c2f031b5 100644 --- a/tests/hlapi/asyncio/manager/cmdgen/test_v2c_bulkwalk.py +++ b/tests/hlapi/asyncio/manager/cmdgen/test_v2c_bulkwalk.py @@ -5,16 +5,18 @@ @pytest.mark.asyncio -async def test_v2c_get_table_bulk(): +@pytest.mark.parametrize("max_repetitions", [1, 2, 3, 4, 5, 6, 10, 12, 15, 20, 30]) +async def test_v2c_get_table_bulk(max_repetitions): async with AgentContextManager(): snmpEngine = SnmpEngine() objects = bulkWalkCmd( snmpEngine, CommunityData("public"), - await UdpTransportTarget.create(("localhost", AGENT_PORT)), + # await UdpTransportTarget.create(("localhost", AGENT_PORT)), + await Udp6TransportTarget.create(("demo.pysnmp.com", 161)), ContextData(), 0, - 4, + max_repetitions, ObjectType(ObjectIdentity("SNMPv2-MIB", "sysDescr", 0)), ) @@ -24,23 +26,23 @@ async def test_v2c_get_table_bulk(): assert errorIndication is None assert errorStatus == 0 - assert len(varBinds) == 1 + assert len(varBinds) == max_repetitions assert varBinds[0][0].prettyPrint() == "SNMPv2-MIB::sysObjectID.0" errorIndication, errorStatus, errorIndex, varBinds = objects_list[1] assert errorIndication is None assert errorStatus == 0 - assert len(varBinds) == 1 - assert varBinds[0][0].prettyPrint() == "SNMPv2-MIB::sysUpTime.0" + assert len(varBinds) == max_repetitions + # assert varBinds[0][0].prettyPrint() == "SNMPv2-MIB::sysName.0" - assert len(objects_list), 50 + assert len(objects_list) == 60 / max_repetitions snmpEngine.closeDispatcher() @pytest.mark.asyncio -async def test_v2c_get_table_bulk_0_4(): +async def test_v2c_get_table_bulk_0_4_subtree(): async with AgentContextManager(): snmpEngine = SnmpEngine() index = 0 @@ -56,12 +58,15 @@ async def test_v2c_get_table_bulk_0_4(): ): assert errorIndication is None assert errorStatus == 0 - assert len(varBinds) == 1 + assert len(varBinds) == 4 if index == 0: assert varBinds[0][0].prettyPrint() == "SNMPv2-MIB::snmpInPkts.0" if index == 1: - assert varBinds[0][0].prettyPrint() == "SNMPv2-MIB::snmpOutPkts.0" + assert ( + varBinds[0][0].prettyPrint() + == "SNMPv2-MIB::snmpInBadCommunityUses.0" + ) if index == 26: assert varBinds[0][0].prettyPrint() == "SNMPv2-MIB::snmpSilentDrops.0" @@ -71,13 +76,13 @@ async def test_v2c_get_table_bulk_0_4(): index += 1 - assert index == 28 + assert index == 7 snmpEngine.closeDispatcher() @pytest.mark.asyncio -async def test_v2c_get_table_bulk_0_1(): +async def test_v2c_get_table_bulk_0_1_subtree(): async with AgentContextManager(): snmpEngine = SnmpEngine() index = 0 @@ -111,3 +116,205 @@ async def test_v2c_get_table_bulk_0_1(): assert index == 28 snmpEngine.closeDispatcher() + + +@pytest.mark.asyncio +async def test_v2c_get_table_bulk_0_7(): + async with AgentContextManager(): + snmpEngine = SnmpEngine() + max_repetitions = 7 + objects = bulkWalkCmd( + snmpEngine, + CommunityData("public"), + # await UdpTransportTarget.create(("localhost", AGENT_PORT)), + await Udp6TransportTarget.create(("demo.pysnmp.com", 161)), + ContextData(), + 0, + max_repetitions, + ObjectType(ObjectIdentity("SNMPv2-MIB", "sysDescr", 0)), + ) + + objects_list = [obj async for obj in objects] + + errorIndication, errorStatus, errorIndex, varBinds = objects_list[0] + + assert errorIndication is None + assert errorStatus == 0 + assert len(varBinds) == max_repetitions + assert varBinds[0][0].prettyPrint() == "SNMPv2-MIB::sysObjectID.0" + + errorIndication, errorStatus, errorIndex, varBinds = objects_list[1] + + assert errorIndication is None + assert errorStatus == 0 + assert len(varBinds) == max_repetitions + # assert varBinds[0][0].prettyPrint() == "SNMPv2-MIB::sysName.0" + + assert len(objects_list) == 9 + snmpEngine.closeDispatcher() + + +@pytest.mark.asyncio +async def test_v2c_get_table_bulk_0_8(): + snmpEngine = SnmpEngine() + max_repetitions = 8 + objects = bulkWalkCmd( + snmpEngine, + CommunityData("public"), + # await UdpTransportTarget.create(("localhost", AGENT_PORT)), + await Udp6TransportTarget.create(("demo.pysnmp.com", 161)), + ContextData(), + 0, + max_repetitions, + ObjectType(ObjectIdentity("SNMPv2-MIB", "sysDescr", 0)), + ) + + objects_list = [obj async for obj in objects] + + errorIndication, errorStatus, errorIndex, varBinds = objects_list[0] + + assert errorIndication is None + assert errorStatus == 0 + assert len(varBinds) == max_repetitions + assert varBinds[0][0].prettyPrint() == "SNMPv2-MIB::sysObjectID.0" + + errorIndication, errorStatus, errorIndex, varBinds = objects_list[1] + + assert errorIndication is None + assert errorStatus == 0 + assert len(varBinds) == max_repetitions + # assert varBinds[0][0].prettyPrint() == "SNMPv2-MIB::sysName.0" + + assert len(objects_list) == 8 + snmpEngine.closeDispatcher() + + +@pytest.mark.asyncio +async def test_v2c_get_table_bulk_0_31(): + async with AgentContextManager(): + snmpEngine = SnmpEngine() + max_repetitions = 31 + objects = bulkWalkCmd( + snmpEngine, + CommunityData("public"), + # await UdpTransportTarget.create(("localhost", AGENT_PORT)), + await Udp6TransportTarget.create(("demo.pysnmp.com", 161)), + ContextData(), + 0, + max_repetitions, + ObjectType(ObjectIdentity("SNMPv2-MIB", "sysDescr", 0)), + ) + + objects_list = [obj async for obj in objects] + + errorIndication, errorStatus, errorIndex, varBinds = objects_list[0] + + assert errorIndication is None + assert errorStatus == 0 + assert len(varBinds) == max_repetitions + assert varBinds[0][0].prettyPrint() == "SNMPv2-MIB::sysObjectID.0" + + errorIndication, errorStatus, errorIndex, varBinds = objects_list[1] + + assert errorIndication is None + assert errorStatus == 0 + assert len(varBinds) == 60 - max_repetitions + # assert varBinds[0][0].prettyPrint() == "SNMPv2-MIB::sysName.0" + + assert len(objects_list) == 2 + snmpEngine.closeDispatcher() + + +@pytest.mark.asyncio +async def test_v2c_get_table_bulk_0_60(): + async with AgentContextManager(): + snmpEngine = SnmpEngine() + max_repetitions = 60 + objects = bulkWalkCmd( + snmpEngine, + CommunityData("public"), + # await UdpTransportTarget.create(("localhost", AGENT_PORT)), + await Udp6TransportTarget.create(("demo.pysnmp.com", 161)), + ContextData(), + 0, + max_repetitions, + ObjectType(ObjectIdentity("SNMPv2-MIB", "sysDescr", 0)), + ) + + objects_list = [obj async for obj in objects] + + errorIndication, errorStatus, errorIndex, varBinds = objects_list[0] + + assert errorIndication is None + assert errorStatus == 0 + assert len(varBinds) == max_repetitions + assert varBinds[0][0].prettyPrint() == "SNMPv2-MIB::sysObjectID.0" + + assert len(objects_list) == 1 + snmpEngine.closeDispatcher() + + +@pytest.mark.asyncio +async def test_v2c_get_table_bulk_0_5_subtree(): + async with AgentContextManager(): + snmpEngine = SnmpEngine() + max_repetitions = 5 + objects = bulkWalkCmd( + snmpEngine, + CommunityData("public"), + # await UdpTransportTarget.create(("localhost", AGENT_PORT)), + await Udp6TransportTarget.create(("demo.pysnmp.com", 161)), + ContextData(), + 0, + max_repetitions, + ObjectType(ObjectIdentity("SNMPv2-MIB", "system")), + lexicographicMode=False, + ) + + objects_list = [obj async for obj in objects] + + errorIndication, errorStatus, errorIndex, varBinds = objects_list[0] + + assert errorIndication is None + assert errorStatus == 0 + assert len(varBinds) == max_repetitions + assert varBinds[0][0].prettyPrint() == "SNMPv2-MIB::sysDescr.0" + + errorIndication, errorStatus, errorIndex, varBinds = objects_list[3] + assert len(varBinds) == 1 + + assert len(objects_list) == 4 + snmpEngine.closeDispatcher() + + +@pytest.mark.asyncio +async def test_v2c_get_table_bulk_0_6_subtree(): + async with AgentContextManager(): + snmpEngine = SnmpEngine() + max_repetitions = 6 + objects = bulkWalkCmd( + snmpEngine, + CommunityData("public"), + # await UdpTransportTarget.create(("localhost", AGENT_PORT)), + await Udp6TransportTarget.create(("demo.pysnmp.com", 161)), + ContextData(), + 0, + max_repetitions, + ObjectType(ObjectIdentity("SNMPv2-MIB", "system")), + lexicographicMode=False, + ) + + objects_list = [obj async for obj in objects] + + errorIndication, errorStatus, errorIndex, varBinds = objects_list[0] + + assert errorIndication is None + assert errorStatus == 0 + assert len(varBinds) == max_repetitions + assert varBinds[0][0].prettyPrint() == "SNMPv2-MIB::sysDescr.0" + + errorIndication, errorStatus, errorIndex, varBinds = objects_list[2] + assert len(varBinds) == 4 + + assert len(objects_list) == 3 + snmpEngine.closeDispatcher()