From 006627df4177851a8be110bd38f66bcd2ad7625f Mon Sep 17 00:00:00 2001 From: Andreas Rammhold Date: Sat, 27 Feb 2016 20:58:34 +0100 Subject: [PATCH 1/4] Consolidated redundant code --- isc_dhcp_leases/iscdhcpleases.py | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/isc_dhcp_leases/iscdhcpleases.py b/isc_dhcp_leases/iscdhcpleases.py index 0ba54b2..e514da6 100644 --- a/isc_dhcp_leases/iscdhcpleases.py +++ b/isc_dhcp_leases/iscdhcpleases.py @@ -2,7 +2,8 @@ import datetime import codecs import struct -import binascii + +from six import iteritems def parse_time(s): @@ -15,6 +16,16 @@ def parse_time(s): return datetime.datetime(*map(int, (year, mon, day, hour, minute, sec))) +def _parse_options(data): + options = {} + for key, value in iteritems(data): + if key.startswith('option '): + name = key.split(' ', 3)[1] + options[name] = value + + return options + + class IscDhcpLeases(object): """ Class to parse isc-dhcp-server lease files into lease objects @@ -106,11 +117,7 @@ def __init__(self, ip, data): else: self.end = parse_time(data['ends']) - self.options = {} - for key in data: - if key.startswith('option '): - part = key.split(' ') - self.options[part[1]] = self.data[key] + self.options = _parse_options(self.data) self._hardware = data['hardware'].split(' ') self.ethernet = self._hardware[1] @@ -179,11 +186,7 @@ def __init__(self, ip, data, cltt, host_identifier, address_type): else: self.end = parse_time(data['ends']) - self.options = {} - for key in data: - if key.startswith('option '): - part = key.split(' ') - self.options[part[1]] = self.data[key] + self.options = _parse_options(self.data) self.preferred_life = int(data['preferred-life']) self.max_life = int(data['max-life']) From ba5ce8dae46c005f2356db424a31ca6a99de16cd Mon Sep 17 00:00:00 2001 From: Andreas Rammhold Date: Sat, 27 Feb 2016 20:59:08 +0100 Subject: [PATCH 2/4] Reordered imports --- isc_dhcp_leases/iscdhcpleases.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/isc_dhcp_leases/iscdhcpleases.py b/isc_dhcp_leases/iscdhcpleases.py index e514da6..15cd295 100644 --- a/isc_dhcp_leases/iscdhcpleases.py +++ b/isc_dhcp_leases/iscdhcpleases.py @@ -1,6 +1,7 @@ -import re -import datetime +import binascii import codecs +import datetime +import re import struct from six import iteritems From b500266a95a1f2d9c204d5baf6bbc27f4719e918 Mon Sep 17 00:00:00 2001 From: Andreas Rammhold Date: Sat, 27 Feb 2016 22:08:54 +0100 Subject: [PATCH 3/4] Added 'set' statements to the lease object & consolidated some shared code --- isc_dhcp_leases/iscdhcpleases.py | 88 ++++++++++++++++++-------------- 1 file changed, 51 insertions(+), 37 deletions(-) diff --git a/isc_dhcp_leases/iscdhcpleases.py b/isc_dhcp_leases/iscdhcpleases.py index 15cd295..d25fd21 100644 --- a/isc_dhcp_leases/iscdhcpleases.py +++ b/isc_dhcp_leases/iscdhcpleases.py @@ -17,16 +17,25 @@ def parse_time(s): return datetime.datetime(*map(int, (year, mon, day, hour, minute, sec))) -def _parse_options(data): - options = {} +def _parse_data(data, prefix): for key, value in iteritems(data): - if key.startswith('option '): + if key.startswith(prefix): name = key.split(' ', 3)[1] - options[name] = value + yield name, value +def _parse_options(data): + options = dict(_parse_data(data, 'option ')) return options +def _parse_set(data): + sets = {} + for key, value in _parse_data(data, 'set '): + sets[key] = value.split('= "', 1)[1][:-1] + + return sets + + class IscDhcpLeases(object): """ Class to parse isc-dhcp-server lease files into lease objects @@ -39,7 +48,7 @@ def __init__(self, filename): self.regex_leaseblock = re.compile(r"lease (?P\d+\.\d+\.\d+\.\d+) {(?P[\s\S]+?)\n}") self.regex_leaseblock6 = re.compile( r"ia-(?Pta|na|pd) \"(?P[^\"\\]*(?:\\.[^\"\\]*)*)\" {(?P[\s\S]+?)\n}") - self.regex_properties = re.compile(r"\s+(?Poption\s+\S+|\S+) (?P[\s\S]+?);") + self.regex_properties = re.compile(r"\s+(?P(option|set)\s+\S+|\S+) (?P[\s\S]+?);") self.regex_iaaddr = re.compile(r"ia(addr|prefix) (?P[0-9a-f:]+(/[0-9]+)?) {(?P[\s\S]+?)\n\s+}") def get(self): @@ -53,7 +62,7 @@ def get(self): block = match.groupdict() properties = self.regex_properties.findall(block['config']) - properties = {key: value for (key, value) in properties} + properties = {key: value for (key, _, value) in properties} if 'hardware' not in properties: # E.g. rows like {'binding': 'state abandoned', ...} continue @@ -63,7 +72,7 @@ def get(self): for match in self.regex_leaseblock6.finditer(lease_data): block = match.groupdict() properties = self.regex_properties.findall(block['config']) - properties = {key: value for (key, value) in properties} + properties = {key: value for (key, _, value) in properties} host_identifier = block['id'] block_type = block['type'] last_client_communication = parse_time(properties['cltt']) @@ -71,7 +80,7 @@ def get(self): for address_block in self.regex_iaaddr.finditer(block['config']): block = address_block.groupdict() properties = self.regex_properties.findall(block['config']) - properties = {key: value for (key, value) in properties} + properties = {key: value for (key, _, value) in properties} lease = Lease6(block['ip'], properties, last_client_communication, host_identifier, block_type) leases.append(lease) @@ -94,7 +103,34 @@ def get_current(self): return leases -class Lease(object): +class BaseLease(object): + """ + Base Implementation for all leases. This does most of the common work that is shared among v4 and v6 leases. + + Attributes: + ip The IP address assigned by this lease as string + data Dict of all the info in the dhcpd.leases file for this lease + options Options on this lease + sets Dict of key-value set statement values from this lease + """ + + def __init__(self, ip, data): + self.ip = ip + self.data = data + self.options = _parse_options(self.data) + self.sets = _parse_set(self.data) + self.binding_state = " ".join(data['binding'].split(' ')[1:]) + + @property + def active(self): + """ + Shorthand to check if the binding_state is active + :return: bool: True if lease is active + """ + return self.binding_state == 'active' + + +class Lease(BaseLease): """ Representation of a IPv4 dhcp lease @@ -110,21 +146,18 @@ class Lease(object): """ def __init__(self, ip, data): - self.data = data - self.ip = ip + super(Lease, self).__init__(ip, data) + self.start = parse_time(data['starts']) if data['ends'] == 'never': self.end = None else: self.end = parse_time(data['ends']) - self.options = _parse_options(self.data) - self._hardware = data['hardware'].split(' ') self.ethernet = self._hardware[1] self.hardware = self._hardware[0] self.hostname = data.get('client-hostname', '').replace("\"", "") - self.binding_state = " ".join(data['binding'].split(' ')[1:]) @property def valid(self): @@ -137,14 +170,6 @@ def valid(self): else: return self.start <= datetime.datetime.utcnow() <= self.end - @property - def active(self): - """ - Shorthand to check if the binding_state is active - :return: bool: True if lease is active - """ - return self.binding_state == 'active' - def __repr__(self): return "".format(self.ip, self.ethernet, self.hostname) @@ -152,7 +177,7 @@ def __eq__(self, other): return self.ip == other.ip and self.ethernet == other.ethernet and self.start == other.start -class Lease6(object): +class Lease6(BaseLease): """ Representation of a IPv6 dhcp lease @@ -172,9 +197,9 @@ class Lease6(object): (TEMPORARY, NON_TEMPORARY, PREFIX_DELEGATION) = ('ta', 'na', 'pd') - def __init__(self, ip, data, cltt, host_identifier, address_type): - self.data = data - self.ip = ip + def __init__(self, ip, data, cltt, host_identifier, address_type): + super(Lease6, self).__init__(ip, data) + self.type = address_type self.last_communication = cltt @@ -187,11 +212,8 @@ def __init__(self, ip, data, cltt, host_identifier, address_type): else: self.end = parse_time(data['ends']) - self.options = _parse_options(self.data) - self.preferred_life = int(data['preferred-life']) self.max_life = int(data['max-life']) - self.binding_state = " ".join(data['binding'].split(' ')[1:]) @property def host_identifier_string(self): @@ -211,14 +233,6 @@ def valid(self): else: return datetime.datetime.utcnow() <= self.end - @property - def active(self): - """ - Shorthand to check if the binding_state is active - :return: bool: True if lease is active - """ - return self.binding_state == 'active' - def __repr__(self): return "".format(self.ip) From e6734823a70eea8301eb6dedf0f84a88b409f94b Mon Sep 17 00:00:00 2001 From: Andreas Rammhold Date: Sat, 27 Feb 2016 22:30:55 +0100 Subject: [PATCH 4/4] Added tests for 'set' statements --- isc_dhcp_leases/test_files/debian7.leases | 1 + isc_dhcp_leases/test_iscDhcpLeases.py | 5 +++++ 2 files changed, 6 insertions(+) diff --git a/isc_dhcp_leases/test_files/debian7.leases b/isc_dhcp_leases/test_files/debian7.leases index 98823ce..33ee080 100644 --- a/isc_dhcp_leases/test_files/debian7.leases +++ b/isc_dhcp_leases/test_files/debian7.leases @@ -9,6 +9,7 @@ lease 10.0.0.10 { binding state free; hardware ethernet 60:a4:4c:b5:6a:dd; uid "\377\000\000\000\002\000\001\000\001\0321\301\300\000#\213\360F\350"; + set vendor-class-identifier = "Some Vendor Identifier"; } lease 10.0.0.15 { starts 4 2014/01/23 13:40:45; diff --git a/isc_dhcp_leases/test_iscDhcpLeases.py b/isc_dhcp_leases/test_iscDhcpLeases.py index ca46d5f..763f655 100644 --- a/isc_dhcp_leases/test_iscDhcpLeases.py +++ b/isc_dhcp_leases/test_iscDhcpLeases.py @@ -21,6 +21,7 @@ def test_get(self): self.assertEqual(result[0].hostname, "") self.assertEqual(result[0].start, datetime(2013, 12, 10, 12, 57, 4)) self.assertEqual(result[0].end, datetime(2013, 12, 10, 13, 7, 4)) + self.assertEqual(result[0].sets, {'vendor-class-identifier': 'Some Vendor Identifier'}) leases = IscDhcpLeases("isc_dhcp_leases/test_files/pfsense.leases") result = leases.get() @@ -76,6 +77,7 @@ def test_get(self): self.assertEqual(result[0].max_life, 864) self.assertEqual(result[0].last_communication, datetime(2016, 1, 6, 14, 50, 34)) self.assertEqual(result[0].type, Lease6.NON_TEMPORARY) + self.assertEqual(result[0].sets, dict(iana='2001:10:10:0:0:0:0:106', clientduid='0100011cf710a5002722332b34')) self.assertEqual(result[1].ip, "2001:10:30:ff00::/56") self.assertEqual(result[1].host_identifier, b"\x00\x00\x00\x00\x00\x01\x00\x01\x1d4L\x00\x00%\x90k\xa14") @@ -88,6 +90,9 @@ def test_get(self): self.assertEqual(result[1].max_life, 864) self.assertEqual(result[1].last_communication, datetime(2016, 1, 6, 14, 52, 37)) self.assertEqual(result[1].type, Lease6.PREFIX_DELEGATION) + self.assertEqual(result[1].sets, dict(iapd='2001:10:30:ff00:0:0:0:0', pdsize='56', + pdnet='2001:10:30:ff00:0:0:0:0/56', + clientduid='0100011d344c000025906ba134')) leases = IscDhcpLeases("isc_dhcp_leases/test_files/options.leases") result = leases.get()