From c70bf83c66333a5c94a7aded74bb79eb552c7068 Mon Sep 17 00:00:00 2001 From: Michel Albert Date: Mon, 25 Nov 2019 08:47:44 +0100 Subject: [PATCH] Add support for endOfMibView and noSuchInstance References #66 and #54 The values are now supported but this brings microscopic changes to the API. I will keep this a *minor* change because the previous behaviour of the API was incorrect, and code relying on that behaviour is hence incorrect as well. The biggest impact this could have stems from the fact that code which previously incorrectly raised a "NoSuchOID" exception, now returns ``None`` (in the pythonic API) and either ``NoSuchInstance()`` or ``NoSuchObject()`` in the "raw" API. So any code catching ``NoSuchOID`` might require changes. --- puresnmp/aio/api/raw.py | 54 +++++++++------- puresnmp/api/raw.py | 41 ++++++------ puresnmp/pdu.py | 2 + puresnmp/test/data/get_non_existing_80.hex | 3 + puresnmp/test/data/get_non_existing_81.hex | 3 + .../test/data/gh-issues/66-nosuchinstance.hex | 5 ++ .../test/data/multiget_nosuchinstance.hex | 18 ++++++ puresnmp/test/data/multiget_nosuchobject.hex | 18 ++++++ .../test/gh-issues/test_54_endofmibview.py | 18 +++++- .../test/gh-issues/test_66_nosuchinstance.py | 63 +++++++++++++++++++ puresnmp/test/test_aio_raw.py | 21 ++++--- puresnmp/test/test_pythonized.py | 32 ++++++---- puresnmp/test/test_raw.py | 29 ++++----- puresnmp/test/test_types.py | 28 +++++++++ puresnmp/test/x690/test_types.py | 21 +++---- puresnmp/types.py | 52 ++++++++++++--- puresnmp/x690/types.py | 4 +- 17 files changed, 300 insertions(+), 112 deletions(-) create mode 100644 puresnmp/test/data/get_non_existing_80.hex create mode 100644 puresnmp/test/data/get_non_existing_81.hex create mode 100644 puresnmp/test/data/gh-issues/66-nosuchinstance.hex create mode 100644 puresnmp/test/data/multiget_nosuchinstance.hex create mode 100644 puresnmp/test/data/multiget_nosuchobject.hex create mode 100644 puresnmp/test/gh-issues/test_66_nosuchinstance.py diff --git a/puresnmp/aio/api/raw.py b/puresnmp/aio/api/raw.py index a49a03b..7a436d3 100644 --- a/puresnmp/aio/api/raw.py +++ b/puresnmp/aio/api/raw.py @@ -1,33 +1,31 @@ from __future__ import unicode_literals -from collections import OrderedDict -from typing import TYPE_CHECKING + import logging import sys +from collections import OrderedDict +from typing import TYPE_CHECKING -from ...x690.types import ( - Integer, - ObjectIdentifier, - OctetString, - Sequence, - Type, -) -from ...x690.util import to_bytes, tablify -from ...exc import SnmpError, NoSuchOID, FaultySNMPImplementation +from ...const import ERRORS_STRICT, ERRORS_WARN, Version +from ...exc import FaultySNMPImplementation, NoSuchOID, SnmpError from ...pdu import ( BulkGetRequest, GetNextRequest, GetRequest, SetRequest, - VarBind, - END_OF_MIB_VIEW, + VarBind ) -from ...const import Version, ERRORS_WARN, ERRORS_STRICT -from ..transport import Transport -from ...util import ( - BulkResult, # NOQA (must be here for type detection) - get_unfinished_walk_oids, - group_varbinds, +from ...types import EndOfMibView +from ...util import BulkResult # NOQA (must be here for type detection) +from ...util import get_unfinished_walk_oids, group_varbinds +from ...x690.types import ( + Integer, + ObjectIdentifier, + OctetString, + Sequence, + Type ) +from ...x690.util import tablify, to_bytes +from ..transport import Transport if TYPE_CHECKING: # pragma: no cover # pylint: disable=unused-import, invalid-name, ungrouped-imports @@ -141,11 +139,12 @@ async def multigetnext(ip, community, oids, port=161, timeout=6): 'but got %d' % (len(oids), len(response_object.varbinds))) output = [] - for oid, value in response_object.varbinds: - if value is END_OF_MIB_VIEW: + for varbind in response_object.varbinds: + if varbind.value == EndOfMibView(): + # TODO: Should this not be "continue" in case that other listings + # follow? break - output.append(VarBind(oid, value)) - + output.append(varbind) # Verify that the OIDs we retrieved are successors of the requested OIDs. for requested, retrieved in zip(oids, output): @@ -439,7 +438,14 @@ async def bulkget( # prepare output for listing repeating_out = OrderedDict() # type: Dict[str, Type] for oid, value in repeating_tmp: - if value is END_OF_MIB_VIEW: + if isinstance(value, EndOfMibView): + # An SNMP agent behaving correctly, will return the OIDs ordered, + # and running into an EndOfMibView marker means we have reached the + # end of an accessible tree. + # TODO: What if one of the trees walking over is a restricted + # MIB-View, but there are valid values *after* that view? Should we + # not use "continue" here for that? But what about non-compliant + # SNMP servers which return values unordered? break repeating_out[unicode(oid)] = value diff --git a/puresnmp/api/raw.py b/puresnmp/api/raw.py index 6b8a91f..164f7c4 100644 --- a/puresnmp/api/raw.py +++ b/puresnmp/api/raw.py @@ -10,36 +10,28 @@ case it's recommended to use :py:mod:`puresnmp.api.raw`. ''' from __future__ import unicode_literals -from collections import OrderedDict -from typing import TYPE_CHECKING + import logging import sys +from collections import OrderedDict +from typing import TYPE_CHECKING -from ..x690.types import ( - Integer, - ObjectIdentifier, - OctetString, - Sequence, - Type, -) -from ..x690.util import to_bytes, tablify -from ..exc import SnmpError, NoSuchOID, FaultySNMPImplementation -from ..types import EndOfMibView +from ..const import ERRORS_STRICT, ERRORS_WARN, Version +from ..exc import FaultySNMPImplementation, NoSuchOID, SnmpError from ..pdu import ( + END_OF_MIB_VIEW, BulkGetRequest, GetNextRequest, GetRequest, SetRequest, - VarBind, - END_OF_MIB_VIEW, + VarBind ) -from ..const import Version, ERRORS_WARN, ERRORS_STRICT from ..transport import Transport -from ..util import ( - BulkResult, # NOQA (must be here for type detection) - get_unfinished_walk_oids, - group_varbinds, -) +from ..types import EndOfMibView, NoSuchInstance, NoSuchObject +from ..util import BulkResult # NOQA (must be here for type detection) +from ..util import get_unfinished_walk_oids, group_varbinds +from ..x690.types import Integer, ObjectIdentifier, OctetString, Sequence, Type +from ..x690.util import tablify, to_bytes if TYPE_CHECKING: # pragma: no cover # pylint: disable=unused-import, invalid-name, ungrouped-imports @@ -448,7 +440,14 @@ def bulkget( # prepare output for listing repeating_out = OrderedDict() # type: Dict[str, Type] for oid, value in repeating_tmp: - if value is END_OF_MIB_VIEW: + if isinstance(value, EndOfMibView): + # An SNMP agent behaving correctly, will return the OIDs ordered, + # and running into an EndOfMibView marker means we have reached the + # end of an accessible tree. + # TODO: What if one of the trees walking over is a restricted + # MIB-View, but there are valid values *after* that view? Should we + # not use "continue" here for that? But what about non-compliant + # SNMP servers which return values unordered? break repeating_out[unicode(oid)] = value diff --git a/puresnmp/pdu.py b/puresnmp/pdu.py index d13b6ef..445abf6 100644 --- a/puresnmp/pdu.py +++ b/puresnmp/pdu.py @@ -119,6 +119,8 @@ class PDU(Type): The superclass for SNMP Messages (GET, SET, GETNEXT, ...) """ TYPECLASS = TypeInfo.CONTEXT + ENCODINGS = {TypeInfo.CONSTRUCTED} + TAG = 0x10 @classmethod def decode(cls, data): diff --git a/puresnmp/test/data/get_non_existing_80.hex b/puresnmp/test/data/get_non_existing_80.hex new file mode 100644 index 0000000..8b69cdc --- /dev/null +++ b/puresnmp/test/data/get_non_existing_80.hex @@ -0,0 +1,3 @@ +0000: 30 2A 02 01 01 04 0A 72 65 73 74 72 69 63 74 65 0*.....restricte +0016: 64 A2 19 02 04 74 E6 91 AD 02 01 00 02 01 00 30 d....t.........0 +0032: 0B 30 09 06 05 2A 03 04 05 06 80 00 .0...*...... diff --git a/puresnmp/test/data/get_non_existing_81.hex b/puresnmp/test/data/get_non_existing_81.hex new file mode 100644 index 0000000..c5865c2 --- /dev/null +++ b/puresnmp/test/data/get_non_existing_81.hex @@ -0,0 +1,3 @@ +0000: 30 28 02 01 01 04 06 70 75 62 6C 69 63 A2 1B 02 0(.....public... +0016: 04 6B B5 C4 12 02 01 00 02 01 00 30 0D 30 0B 06 .k.........0.0.. +0032: 07 2B 06 01 02 01 01 02 81 00 .+........ diff --git a/puresnmp/test/data/gh-issues/66-nosuchinstance.hex b/puresnmp/test/data/gh-issues/66-nosuchinstance.hex new file mode 100644 index 0000000..2e8fe7d --- /dev/null +++ b/puresnmp/test/data/gh-issues/66-nosuchinstance.hex @@ -0,0 +1,5 @@ +30 42 02 01 01 04 06 30 30 30 30 30 30 a2 35 02 0B.....000000.5. +04 5d 9d 59 c9 02 01 00 02 01 00 30 27 30 12 06 .].Y.......0'0.. +0d 2b 06 01 04 01 81 8a 31 15 01 02 0a 00 02 01 .+......1....... +01 30 11 06 0d 2b 06 01 04 01 81 8a 31 15 01 03 .0...+......1... +01 00 81 00 .... diff --git a/puresnmp/test/data/multiget_nosuchinstance.hex b/puresnmp/test/data/multiget_nosuchinstance.hex new file mode 100644 index 0000000..83bf4ca --- /dev/null +++ b/puresnmp/test/data/multiget_nosuchinstance.hex @@ -0,0 +1,18 @@ +# -*- ascii-cols: 6-56 -*- +# +# The following dump relates to GitHub issue #66 and contains a "multiget" +# response with the 4 results: +# +# +# .1.2.3.4.5.6.7.8.9 = No Such Object available on this agent at this OID +# .1.3.6.1.2.1.1.1.2.0 = No Such Instance currently exists at this OID +# .1.3.6.1.2.1.1.2.0 = OID: .1.3.6.1.4.1.8072.3.2.10 +# .1.3.6.1.2.1.2.1.0 = INTEGER: 2 + +0000: 30 60 02 01 01 04 07 70 72 69 76 61 74 65 A2 52 0`.....private.R +0016: 02 04 1F A9 F4 E3 02 01 00 02 01 00 30 44 30 0C ............0D0. +0032: 06 08 2A 03 04 05 06 07 08 09 80 00 30 0D 06 09 ..*.........0... +0048: 2B 06 01 02 01 01 01 02 00 81 00 30 16 06 08 2B +..........0...+ +0064: 06 01 02 01 01 02 00 06 0A 2B 06 01 04 01 BF 08 .........+...... +0080: 03 02 0A 30 0D 06 08 2B 06 01 02 01 02 01 00 02 ...0...+........ +0096: 01 02 .. diff --git a/puresnmp/test/data/multiget_nosuchobject.hex b/puresnmp/test/data/multiget_nosuchobject.hex new file mode 100644 index 0000000..60aff00 --- /dev/null +++ b/puresnmp/test/data/multiget_nosuchobject.hex @@ -0,0 +1,18 @@ +# -*- ascii-cols: 6-56 -*- +# +# The following dump relates to GitHub issue #66 and contains a "multiget" +# response with the 4 results: +# +# +# .1.2.3.4.5.6.7.8.9 = No Such Object available on this agent at this OID +# .1.3.6.1.2.1.1.1.2.0 = No Such Object available on this agent at this OID +# .1.3.6.1.2.1.1.2.0 = No Such Object available on this agent at this OID +# .1.3.6.1.2.1.2.1.0 = INTEGER: 2 + +0000: 30 59 02 01 01 04 0A 72 65 73 74 72 69 63 74 65 0Y.....restricte +0016: 64 A2 48 02 04 1C AD 79 DC 02 01 00 02 01 00 30 d.H....y.......0 +0032: 3A 30 0C 06 08 2A 03 04 05 06 07 08 09 80 00 30 :0...*.........0 +0048: 0D 06 09 2B 06 01 02 01 01 01 02 00 80 00 30 0C ...+..........0. +0064: 06 08 2B 06 01 02 01 01 02 00 80 00 30 0D 06 08 ..+.........0... +0080: 2B 06 01 02 01 02 01 00 02 01 02 +.......... + diff --git a/puresnmp/test/gh-issues/test_54_endofmibview.py b/puresnmp/test/gh-issues/test_54_endofmibview.py index 35fbd8f..83a564b 100644 --- a/puresnmp/test/gh-issues/test_54_endofmibview.py +++ b/puresnmp/test/gh-issues/test_54_endofmibview.py @@ -2,7 +2,6 @@ from puresnmp.api.raw import bulkget from puresnmp.test import readbytes -from puresnmp.types import EndOfMibView from puresnmp.x690.types import Integer, OctetString try: @@ -12,6 +11,23 @@ def test_54_endofmibview(): + """ + The dump in ``gh-issues/54-endofmibview.hex`` contains an "endOfMibView" + marker on the OID '1.3.6.1.6.3.10.2.1.4.0'. So that should be the last OID + we see. + + + NOTE: The return-value of ``bulkget`` for listings is a dictionary which + cannot represent the ``endOfMibView`` marker as-is as it would require to + add the OID twice. For the "pythonic" API this is always correct as it is + supposed to hide the SNMP internals. If is debatable whether this behaviour + is wanted in the "raw" API though. For now this is not supported as it + would require a non backwards-compatible change in the API (the data-type + would change). Using something that "behaves like" a dict is also + questionable acceptable as it would be non-deterministic what happens when + accessing the key which caused the ``endOfMibView`` as this is the one that + is duplicated. + """ data = readbytes('gh-issues/54-endofmibview.hex') with patch('puresnmp.api.raw.Transport') as ptch: ptch().send.return_value = data diff --git a/puresnmp/test/gh-issues/test_66_nosuchinstance.py b/puresnmp/test/gh-issues/test_66_nosuchinstance.py new file mode 100644 index 0000000..e830c88 --- /dev/null +++ b/puresnmp/test/gh-issues/test_66_nosuchinstance.py @@ -0,0 +1,63 @@ +# type: ignore +""" +When an SNMP agent returns the type-value 0x81 a NULL value should be reported. + +By definition in RFC-1905 the "value" part in a VarBind can either be a value, +or one of: + + "noSuchObject": the requested OID does not exist + "noSuchInstance": the OID exists but has no value + "endOfMibView": A marker to signify we have seen all that we are allowed to + see in an operation based on "get-next". + +This is (at the time of this writing, 2019-10-10) not handled properly. See +also gihub issue #54. This module defines a test verifyng that we get a correct +value from a noSuchInstance marker. +""" +from puresnmp.api.pythonic import multiget as pyget +from puresnmp.api.raw import multiget +from puresnmp.test import readbytes +from puresnmp.types import NoSuchInstance +from puresnmp.x690.types import Integer + +try: + from unittest.mock import patch +except ImportError: + from mock import patch + + +def test_data_type(): + """ + We want to have a special data-type for noSuchInstance which behaves + properly (like a varbind & implicit null). + """ + value = NoSuchInstance('1.2.3.4.5') + assert bool(value) is False + assert value.oid == '1.2.3.4.5' + assert value.value is None + + +def test_66_nosuchinstance_raw(): + """ + If we get a noSuchInstance VarBind, we want to report this as an + appropriate value. + """ + data = readbytes('gh-issues/66-nosuchinstance.hex') + with patch('puresnmp.api.raw.Transport') as ptch: + ptch().send.return_value = data + ptch().get_request_id.return_value = 123 + result = multiget('192.0.2.1', 'private', ['1.2.3', '2.3.4']) + assert result == [Integer(1), NoSuchInstance('1.2.3')] + + +def test_66_nosuchinstance_pythonic(): + """ + Pure Python has no data-type to represent the same concept as + "noSuchInstance" so we generalise it as ``None`` + """ + data = readbytes('gh-issues/66-nosuchinstance.hex') + with patch('puresnmp.api.raw.Transport') as ptch: + ptch().send.return_value = data + ptch().get_request_id.return_value = 123 + result = pyget('192.0.2.1', 'private', ['1.2.3', '2.3.4']) + assert result == [1, None] diff --git a/puresnmp/test/test_aio_raw.py b/puresnmp/test/test_aio_raw.py index b65059b..2496912 100644 --- a/puresnmp/test/test_aio_raw.py +++ b/puresnmp/test/test_aio_raw.py @@ -17,7 +17,7 @@ from puresnmp.const import Version from puresnmp.exc import NoSuchOID, SnmpError from puresnmp.pdu import BulkGetRequest, GetNextRequest, GetRequest, VarBind -from puresnmp.types import Counter, Gauge, IpAddress, TimeTicks +from puresnmp.types import NoSuchInstance, Counter, Gauge, IpAddress, TimeTicks, NoSuchObject from puresnmp.util import BulkResult from puresnmp.x690.types import (Integer, ObjectIdentifier, OctetString, Sequence, to_bytes) @@ -86,24 +86,24 @@ async def test_get_non_existing_oid_80(self): mck().send = AsyncMock() mck().send.return_value = data mck().get_request_id.return_value = 0 - with pytest.raises(NoSuchOID): - await get('::1', 'private', '1.2.3') + result = await get('::1', 'private', '1.2.3') + assert not bool(result) + assert isinstance(result, NoSuchObject) @pytest.mark.asyncio - async def test_get_non_existing_oid_81(self): + async def test_get_nosuchinstance(self): """ - A "GET" response on a non-existing OID should raise an appropriate - exception. - - This tests the byte-marker 0x81 (TODO explain 0x81) + If we get a "noSuchInstance" (0x81) on an OID, we should get the + appropriate result. """ data = readbytes('get_non_existing_81.hex') with patch('puresnmp.aio.api.raw.Transport') as mck: mck().send = AsyncMock() mck().send.return_value = data mck().get_request_id.return_value = 0 - with pytest.raises(NoSuchOID): - await get('::1', 'private', '1.2.3') + result = await get('::1', 'private', '1.2.3') + assert not bool(result) + assert isinstance(result, NoSuchInstance) class TestWalk(object): @@ -228,6 +228,7 @@ async def test_eom(self): expected = [ (root+'0', 1), (root+'1', 1), + (root+'2', 1), ] simplified_result = [ diff --git a/puresnmp/test/test_pythonized.py b/puresnmp/test/test_pythonized.py index d16ee8d..803157b 100644 --- a/puresnmp/test/test_pythonized.py +++ b/puresnmp/test/test_pythonized.py @@ -8,16 +8,13 @@ """ from __future__ import unicode_literals -import six + +import unittest from datetime import timedelta from ipaddress import ip_address -try: - from unittest.mock import patch, call -except ImportError: - from mock import patch, call # pip install mock -import unittest -from puresnmp.types import Counter, Gauge, IpAddress +import six + from puresnmp.api.pythonic import ( bulkget, bulkwalk, @@ -32,13 +29,20 @@ walk ) from puresnmp.const import Version -from puresnmp.exc import SnmpError, NoSuchOID +from puresnmp.exc import NoSuchOID, SnmpError from puresnmp.pdu import ( - GetRequest, - VarBind, - GetNextRequest, BulkGetRequest, - Trap + GetNextRequest, + GetRequest, + Trap, + VarBind +) +from puresnmp.types import ( + Counter, + Gauge, + IpAddress, + NoSuchInstance, + NoSuchObject ) from puresnmp.util import BulkResult from puresnmp.x690.types import ( @@ -51,6 +55,10 @@ from . import ByteTester +try: + from unittest.mock import patch, call +except ImportError: + from mock import patch, call # pip install mock class TestGet(ByteTester): diff --git a/puresnmp/test/test_raw.py b/puresnmp/test/test_raw.py index c55fb18..5169666 100644 --- a/puresnmp/test/test_raw.py +++ b/puresnmp/test/test_raw.py @@ -102,31 +102,25 @@ def test_get_multiple_return_binds(self): def test_get_non_existing_oid_80(self): """ - A "GET" response on a non-existing OID should raise an appropriate - exception. - - This tests the byte-marker 0x80 (TODO explain 0x80) + The special "no such object" (0x80) value should be supported """ data = readbytes('get_non_existing_80.hex') with patch('puresnmp.api.raw.Transport') as mck: mck().send.return_value = data mck().get_request_id.return_value = 0 - with self.assertRaises(NoSuchOID): - get('::1', 'private', '1.2.3') + result = get('::1', 'private', '1.2.3') + assert result == NoSuchObject() def test_get_non_existing_oid_81(self): """ - A "GET" response on a non-existing OID should raise an appropriate - exception. - - This tests the byte-marker 0x81 (TODO explain 0x81) + The special "no such instance" (0x81) value should be supported """ data = readbytes("get_non_existing_81.hex") with patch("puresnmp.api.raw.Transport") as mck: mck().send.return_value = data mck().get_request_id.return_value = 0 - with self.assertRaises(NoSuchOID): - get("::1", "private", "1.2.3") + result = get("::1", "private", "1.2.3") + assert result == NoSuchInstance() class TestWalk(unittest.TestCase): @@ -223,11 +217,11 @@ def test_nosuchinstance(self): """ OID = ObjectIdentifier.from_string - data = readbytes("multiget_nosuchobject.hex") + data = readbytes("multiget_nosuchinstance.hex") expected = [ - NoSuchObject(ObjectIdentifier("1.2.3.4.5.6.7.8.9")), - NoSuchInstance(ObjectIdentifier("1.3.6.1.2.1.1.1.2.0")), - ObjectIdentifier(".1.3.6.1.4.1.8072.3.2.10"), + NoSuchObject(None), + NoSuchInstance(None), + OID(".1.3.6.1.4.1.8072.3.2.10"), Integer(2), ] with patch("puresnmp.api.raw.Transport") as mck: @@ -247,7 +241,6 @@ def test_nosuchinstance(self): self.assertEqual(result, expected) self.assertFalse(result[0]) self.assertFalse(result[1]) - self.assertFalse(result[2]) class TestMultiWalk(unittest.TestCase): @@ -587,7 +580,7 @@ def test_eom(self): expected_listing = { root + '0': Integer(1), root + '1': Integer(1), - root + '2': Integer(1), + root + '2': Integer(1) } self.assertEqual(result.listing, expected_listing) diff --git a/puresnmp/test/test_types.py b/puresnmp/test/test_types.py index 3c08138..675d713 100644 --- a/puresnmp/test/test_types.py +++ b/puresnmp/test/test_types.py @@ -3,7 +3,9 @@ """ import pytest + from puresnmp import types as t +from puresnmp.x690.types import pop_tlv @pytest.mark.parametrize('value, expected', [ @@ -39,3 +41,29 @@ def test_counter64(value, expected): """ instance = t.Counter64(value) assert instance.value == expected + + +@pytest.mark.parametrize('data, expected', [ + (b'\x80\x00', t.NoSuchObject()), + (b'\x81\x00', t.NoSuchInstance()), + (b'\x82\x00', t.EndOfMibView()), +]) +def test_decode_simple_types(data, expected): + """ + Decoding data into Python objects should yield the expected results + """ + result = pop_tlv(data) + assert result[0] == expected + + +@pytest.mark.parametrize('cls', [ + t.NoSuchObject, + t.NoSuchInstance, + t.EndOfMibView +]) +def test_nullish(cls): + """ + Some types should be represented as "None" in Python + """ + result = cls().pythonize() + assert result is None diff --git a/puresnmp/test/x690/test_types.py b/puresnmp/test/x690/test_types.py index 172e6eb..cd6e4b5 100644 --- a/puresnmp/test/x690/test_types.py +++ b/puresnmp/test/x690/test_types.py @@ -1,29 +1,29 @@ # pylint: skip-file -import six import sys -try: - unicode -except NameError: - unicode = str +import six -from ...x690.util import TypeInfo from ...x690.types import ( Boolean, Integer, - UnknownType, Null, ObjectIdentifier, OctetString, Sequence, Type, + UnknownType, pop_tlv, to_bytes ) - +from ...x690.util import TypeInfo from .. import ByteTester +try: + unicode +except NameError: + unicode = str + class TestBoolean(ByteTester): @@ -621,11 +621,6 @@ def test_validation_wrong_typeclass(self): with self.assertRaises(ValueError): Integer.validate(to_bytes([0b00111110])) - def test_null_from_bytes(self): - result = Type.from_bytes(b'') - expected = Null() - self.assertEqual(result, expected) - def test_corrupt_length(self): with six.assertRaisesRegex(self, ValueError, 'length'): Integer.from_bytes(b'\x02\x01\x01\x01') diff --git a/puresnmp/types.py b/puresnmp/types.py index 571630b..30f1530 100644 --- a/puresnmp/types.py +++ b/puresnmp/types.py @@ -14,36 +14,68 @@ import sys from datetime import timedelta -from ipaddress import IPv4Address, ip_address +from ipaddress import IPv4Address from struct import pack +from typing import Optional -from .x690.types import Integer, Null, OctetString +from .x690.types import Integer, Null, ObjectIdentifier, OctetString from .x690.util import TypeInfo -class NoSuchInstance(Null): # XXX Docs +class NoSuchObject(Null): """ - A sentinel "nullish" value which marks the missing of a conrete value under - the given OID + A sentinel "nullish" value which marks the missing of given OID """ - + # TODO: byte marker = 0x80 TYPECLASS = TypeInfo.CONTEXT TAG = 0x00 + def __init__(self, oid=None): + # type: (Optional[ObjectIdentifier]) -> None + super().__init__() + self.oid = oid -class NoSuchObject(Null): # XXX Docs - """ - A sentinel "nullish" value which marks the missing of given OID - """ + def __repr__(self): + # type: () -> str + return "NoSuchObject(%r)" % self.oid + +class NoSuchInstance(Null): + ''' + A sentinel "nullish" value which marks the missing of a conrete value under + the given OID + ''' + # TODO: byte marker = 0x81 TYPECLASS = TypeInfo.CONTEXT TAG = 0x01 + def __init__(self, oid=None): + # type: (Optional[ObjectIdentifier]) -> None + super().__init__() + self.oid = oid + + def __repr__(self): + # type: () -> str + return "NoSuchInstance(%r)" % self.oid + class EndOfMibView(Null): # XXX Docs + """ + A sentinel "nullish" value which marks the end of a walk/getNext operation. + """ + # TODO: byte marker = 0x82 TYPECLASS = TypeInfo.CONTEXT TAG = 0x02 + def __init__(self, oid=None): + # type: (Optional[ObjectIdentifier]) -> None + super().__init__() + self.oid = oid + + def __repr__(self): + # type: () -> str + return "EndOfMibView(%r)" % self.oid + class IpAddress(OctetString): """ diff --git a/puresnmp/x690/types.py b/puresnmp/x690/types.py index 3b459fc..fb7229b 100644 --- a/puresnmp/x690/types.py +++ b/puresnmp/x690/types.py @@ -176,7 +176,7 @@ def from_bytes(cls, data): """ if not data: - return Null() + return cls() cls.validate(data) expected_length, data = decode_length(data[1:]) if len(data) != expected_length: @@ -423,9 +423,7 @@ def decode(cls, data): # type: (bytes) -> Sequence output = [] while data: - print('\u001b[30;43m[debug-print-6dfa]\u001b[0m', repr(data[:10])) # XXX debug statement value, data = pop_tlv(data) - print('\u001b[30;43m[debug-print-5b03]\u001b[0m', repr(value)) # XXX debug statement output.append(value) return Sequence(*output)