diff --git a/isc_dhcp_leases/iscdhcpleases.py b/isc_dhcp_leases/iscdhcpleases.py index 0ba54b2..d25fd21 100644 --- a/isc_dhcp_leases/iscdhcpleases.py +++ b/isc_dhcp_leases/iscdhcpleases.py @@ -1,8 +1,10 @@ -import re -import datetime +import binascii import codecs +import datetime +import re import struct -import binascii + +from six import iteritems def parse_time(s): @@ -15,6 +17,25 @@ def parse_time(s): return datetime.datetime(*map(int, (year, mon, day, hour, minute, sec))) +def _parse_data(data, prefix): + for key, value in iteritems(data): + if key.startswith(prefix): + name = key.split(' ', 3)[1] + yield name, value + +def _parse_options(data): + options = dict(_parse_data(data, 'option ')) + return options + + +def _parse_set(data): + sets = {} + for key, value in _parse_data(data, 'set '): + sets[key] = value.split('= "', 1)[1][:-1] + + return sets + + class IscDhcpLeases(object): """ Class to parse isc-dhcp-server lease files into lease objects @@ -27,7 +48,7 @@ def __init__(self, filename): self.regex_leaseblock = re.compile(r"lease (?P\d+\.\d+\.\d+\.\d+) {(?P[\s\S]+?)\n}") self.regex_leaseblock6 = re.compile( r"ia-(?Pta|na|pd) \"(?P[^\"\\]*(?:\\.[^\"\\]*)*)\" {(?P[\s\S]+?)\n}") - self.regex_properties = re.compile(r"\s+(?Poption\s+\S+|\S+) (?P[\s\S]+?);") + self.regex_properties = re.compile(r"\s+(?P(option|set)\s+\S+|\S+) (?P[\s\S]+?);") self.regex_iaaddr = re.compile(r"ia(addr|prefix) (?P[0-9a-f:]+(/[0-9]+)?) {(?P[\s\S]+?)\n\s+}") def get(self): @@ -41,7 +62,7 @@ def get(self): block = match.groupdict() properties = self.regex_properties.findall(block['config']) - properties = {key: value for (key, value) in properties} + properties = {key: value for (key, _, value) in properties} if 'hardware' not in properties: # E.g. rows like {'binding': 'state abandoned', ...} continue @@ -51,7 +72,7 @@ def get(self): for match in self.regex_leaseblock6.finditer(lease_data): block = match.groupdict() properties = self.regex_properties.findall(block['config']) - properties = {key: value for (key, value) in properties} + properties = {key: value for (key, _, value) in properties} host_identifier = block['id'] block_type = block['type'] last_client_communication = parse_time(properties['cltt']) @@ -59,7 +80,7 @@ def get(self): for address_block in self.regex_iaaddr.finditer(block['config']): block = address_block.groupdict() properties = self.regex_properties.findall(block['config']) - properties = {key: value for (key, value) in properties} + properties = {key: value for (key, _, value) in properties} lease = Lease6(block['ip'], properties, last_client_communication, host_identifier, block_type) leases.append(lease) @@ -82,7 +103,34 @@ def get_current(self): return leases -class Lease(object): +class BaseLease(object): + """ + Base Implementation for all leases. This does most of the common work that is shared among v4 and v6 leases. + + Attributes: + ip The IP address assigned by this lease as string + data Dict of all the info in the dhcpd.leases file for this lease + options Options on this lease + sets Dict of key-value set statement values from this lease + """ + + def __init__(self, ip, data): + self.ip = ip + self.data = data + self.options = _parse_options(self.data) + self.sets = _parse_set(self.data) + self.binding_state = " ".join(data['binding'].split(' ')[1:]) + + @property + def active(self): + """ + Shorthand to check if the binding_state is active + :return: bool: True if lease is active + """ + return self.binding_state == 'active' + + +class Lease(BaseLease): """ Representation of a IPv4 dhcp lease @@ -98,25 +146,18 @@ class Lease(object): """ def __init__(self, ip, data): - self.data = data - self.ip = ip + super(Lease, self).__init__(ip, data) + self.start = parse_time(data['starts']) if data['ends'] == 'never': self.end = None else: self.end = parse_time(data['ends']) - self.options = {} - for key in data: - if key.startswith('option '): - part = key.split(' ') - self.options[part[1]] = self.data[key] - self._hardware = data['hardware'].split(' ') self.ethernet = self._hardware[1] self.hardware = self._hardware[0] self.hostname = data.get('client-hostname', '').replace("\"", "") - self.binding_state = " ".join(data['binding'].split(' ')[1:]) @property def valid(self): @@ -129,14 +170,6 @@ def valid(self): else: return self.start <= datetime.datetime.utcnow() <= self.end - @property - def active(self): - """ - Shorthand to check if the binding_state is active - :return: bool: True if lease is active - """ - return self.binding_state == 'active' - def __repr__(self): return "".format(self.ip, self.ethernet, self.hostname) @@ -144,7 +177,7 @@ def __eq__(self, other): return self.ip == other.ip and self.ethernet == other.ethernet and self.start == other.start -class Lease6(object): +class Lease6(BaseLease): """ Representation of a IPv6 dhcp lease @@ -164,9 +197,9 @@ class Lease6(object): (TEMPORARY, NON_TEMPORARY, PREFIX_DELEGATION) = ('ta', 'na', 'pd') - def __init__(self, ip, data, cltt, host_identifier, address_type): - self.data = data - self.ip = ip + def __init__(self, ip, data, cltt, host_identifier, address_type): + super(Lease6, self).__init__(ip, data) + self.type = address_type self.last_communication = cltt @@ -179,15 +212,8 @@ def __init__(self, ip, data, cltt, host_identifier, address_type): else: self.end = parse_time(data['ends']) - self.options = {} - for key in data: - if key.startswith('option '): - part = key.split(' ') - self.options[part[1]] = self.data[key] - self.preferred_life = int(data['preferred-life']) self.max_life = int(data['max-life']) - self.binding_state = " ".join(data['binding'].split(' ')[1:]) @property def host_identifier_string(self): @@ -207,14 +233,6 @@ def valid(self): else: return datetime.datetime.utcnow() <= self.end - @property - def active(self): - """ - Shorthand to check if the binding_state is active - :return: bool: True if lease is active - """ - return self.binding_state == 'active' - def __repr__(self): return "".format(self.ip) diff --git a/isc_dhcp_leases/test_files/debian7.leases b/isc_dhcp_leases/test_files/debian7.leases index 98823ce..33ee080 100644 --- a/isc_dhcp_leases/test_files/debian7.leases +++ b/isc_dhcp_leases/test_files/debian7.leases @@ -9,6 +9,7 @@ lease 10.0.0.10 { binding state free; hardware ethernet 60:a4:4c:b5:6a:dd; uid "\377\000\000\000\002\000\001\000\001\0321\301\300\000#\213\360F\350"; + set vendor-class-identifier = "Some Vendor Identifier"; } lease 10.0.0.15 { starts 4 2014/01/23 13:40:45; diff --git a/isc_dhcp_leases/test_iscDhcpLeases.py b/isc_dhcp_leases/test_iscDhcpLeases.py index ca46d5f..763f655 100644 --- a/isc_dhcp_leases/test_iscDhcpLeases.py +++ b/isc_dhcp_leases/test_iscDhcpLeases.py @@ -21,6 +21,7 @@ def test_get(self): self.assertEqual(result[0].hostname, "") self.assertEqual(result[0].start, datetime(2013, 12, 10, 12, 57, 4)) self.assertEqual(result[0].end, datetime(2013, 12, 10, 13, 7, 4)) + self.assertEqual(result[0].sets, {'vendor-class-identifier': 'Some Vendor Identifier'}) leases = IscDhcpLeases("isc_dhcp_leases/test_files/pfsense.leases") result = leases.get() @@ -76,6 +77,7 @@ def test_get(self): self.assertEqual(result[0].max_life, 864) self.assertEqual(result[0].last_communication, datetime(2016, 1, 6, 14, 50, 34)) self.assertEqual(result[0].type, Lease6.NON_TEMPORARY) + self.assertEqual(result[0].sets, dict(iana='2001:10:10:0:0:0:0:106', clientduid='0100011cf710a5002722332b34')) self.assertEqual(result[1].ip, "2001:10:30:ff00::/56") self.assertEqual(result[1].host_identifier, b"\x00\x00\x00\x00\x00\x01\x00\x01\x1d4L\x00\x00%\x90k\xa14") @@ -88,6 +90,9 @@ def test_get(self): self.assertEqual(result[1].max_life, 864) self.assertEqual(result[1].last_communication, datetime(2016, 1, 6, 14, 52, 37)) self.assertEqual(result[1].type, Lease6.PREFIX_DELEGATION) + self.assertEqual(result[1].sets, dict(iapd='2001:10:30:ff00:0:0:0:0', pdsize='56', + pdnet='2001:10:30:ff00:0:0:0:0/56', + clientduid='0100011d344c000025906ba134')) leases = IscDhcpLeases("isc_dhcp_leases/test_files/options.leases") result = leases.get()