Skip to content

Commit

Permalink
Merge pull request #10 from andir/added-sets
Browse files Browse the repository at this point in the history
Added support for set statements & made some efforts on reducing code duplication
  • Loading branch information
MartijnBraam committed Feb 27, 2016
2 parents 6d44934 + e673482 commit 96cbb2d
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 44 deletions.
106 changes: 62 additions & 44 deletions isc_dhcp_leases/iscdhcpleases.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import re
import datetime
import binascii
import codecs
import datetime
import re
import struct
import binascii

from six import iteritems


def parse_time(s):
Expand All @@ -15,6 +17,25 @@ def parse_time(s):
return datetime.datetime(*map(int, (year, mon, day, hour, minute, sec)))


def _parse_data(data, prefix):
for key, value in iteritems(data):
if key.startswith(prefix):
name = key.split(' ', 3)[1]
yield name, value

def _parse_options(data):
options = dict(_parse_data(data, 'option '))
return options


def _parse_set(data):
sets = {}
for key, value in _parse_data(data, 'set '):
sets[key] = value.split('= "', 1)[1][:-1]

return sets


class IscDhcpLeases(object):
"""
Class to parse isc-dhcp-server lease files into lease objects
Expand All @@ -27,7 +48,7 @@ def __init__(self, filename):
self.regex_leaseblock = re.compile(r"lease (?P<ip>\d+\.\d+\.\d+\.\d+) {(?P<config>[\s\S]+?)\n}")
self.regex_leaseblock6 = re.compile(
r"ia-(?P<type>ta|na|pd) \"(?P<id>[^\"\\]*(?:\\.[^\"\\]*)*)\" {(?P<config>[\s\S]+?)\n}")
self.regex_properties = re.compile(r"\s+(?P<key>option\s+\S+|\S+) (?P<value>[\s\S]+?);")
self.regex_properties = re.compile(r"\s+(?P<key>(option|set)\s+\S+|\S+) (?P<value>[\s\S]+?);")
self.regex_iaaddr = re.compile(r"ia(addr|prefix) (?P<ip>[0-9a-f:]+(/[0-9]+)?) {(?P<config>[\s\S]+?)\n\s+}")

def get(self):
Expand All @@ -41,7 +62,7 @@ def get(self):
block = match.groupdict()

properties = self.regex_properties.findall(block['config'])
properties = {key: value for (key, value) in properties}
properties = {key: value for (key, _, value) in properties}
if 'hardware' not in properties:
# E.g. rows like {'binding': 'state abandoned', ...}
continue
Expand All @@ -51,15 +72,15 @@ def get(self):
for match in self.regex_leaseblock6.finditer(lease_data):
block = match.groupdict()
properties = self.regex_properties.findall(block['config'])
properties = {key: value for (key, value) in properties}
properties = {key: value for (key, _, value) in properties}
host_identifier = block['id']
block_type = block['type']
last_client_communication = parse_time(properties['cltt'])

for address_block in self.regex_iaaddr.finditer(block['config']):
block = address_block.groupdict()
properties = self.regex_properties.findall(block['config'])
properties = {key: value for (key, value) in properties}
properties = {key: value for (key, _, value) in properties}

lease = Lease6(block['ip'], properties, last_client_communication, host_identifier, block_type)
leases.append(lease)
Expand All @@ -82,7 +103,34 @@ def get_current(self):
return leases


class Lease(object):
class BaseLease(object):
"""
Base Implementation for all leases. This does most of the common work that is shared among v4 and v6 leases.
Attributes:
ip The IP address assigned by this lease as string
data Dict of all the info in the dhcpd.leases file for this lease
options Options on this lease
sets Dict of key-value set statement values from this lease
"""

def __init__(self, ip, data):
self.ip = ip
self.data = data
self.options = _parse_options(self.data)
self.sets = _parse_set(self.data)
self.binding_state = " ".join(data['binding'].split(' ')[1:])

@property
def active(self):
"""
Shorthand to check if the binding_state is active
:return: bool: True if lease is active
"""
return self.binding_state == 'active'


class Lease(BaseLease):
"""
Representation of a IPv4 dhcp lease
Expand All @@ -98,25 +146,18 @@ class Lease(object):
"""

def __init__(self, ip, data):
self.data = data
self.ip = ip
super(Lease, self).__init__(ip, data)

self.start = parse_time(data['starts'])
if data['ends'] == 'never':
self.end = None
else:
self.end = parse_time(data['ends'])

self.options = {}
for key in data:
if key.startswith('option '):
part = key.split(' ')
self.options[part[1]] = self.data[key]

self._hardware = data['hardware'].split(' ')
self.ethernet = self._hardware[1]
self.hardware = self._hardware[0]
self.hostname = data.get('client-hostname', '').replace("\"", "")
self.binding_state = " ".join(data['binding'].split(' ')[1:])

@property
def valid(self):
Expand All @@ -129,22 +170,14 @@ def valid(self):
else:
return self.start <= datetime.datetime.utcnow() <= self.end

@property
def active(self):
"""
Shorthand to check if the binding_state is active
:return: bool: True if lease is active
"""
return self.binding_state == 'active'

def __repr__(self):
return "<Lease {} for {} ({})>".format(self.ip, self.ethernet, self.hostname)

def __eq__(self, other):
return self.ip == other.ip and self.ethernet == other.ethernet and self.start == other.start


class Lease6(object):
class Lease6(BaseLease):
"""
Representation of a IPv6 dhcp lease
Expand All @@ -164,9 +197,9 @@ class Lease6(object):

(TEMPORARY, NON_TEMPORARY, PREFIX_DELEGATION) = ('ta', 'na', 'pd')

def __init__(self, ip, data, cltt, host_identifier, address_type):
self.data = data
self.ip = ip
def __init__(self, ip, data, cltt, host_identifier, address_type):
super(Lease6, self).__init__(ip, data)

self.type = address_type
self.last_communication = cltt

Expand All @@ -179,15 +212,8 @@ def __init__(self, ip, data, cltt, host_identifier, address_type):
else:
self.end = parse_time(data['ends'])

self.options = {}
for key in data:
if key.startswith('option '):
part = key.split(' ')
self.options[part[1]] = self.data[key]

self.preferred_life = int(data['preferred-life'])
self.max_life = int(data['max-life'])
self.binding_state = " ".join(data['binding'].split(' ')[1:])

@property
def host_identifier_string(self):
Expand All @@ -207,14 +233,6 @@ def valid(self):
else:
return datetime.datetime.utcnow() <= self.end

@property
def active(self):
"""
Shorthand to check if the binding_state is active
:return: bool: True if lease is active
"""
return self.binding_state == 'active'

def __repr__(self):
return "<Lease6 {}>".format(self.ip)

Expand Down
1 change: 1 addition & 0 deletions isc_dhcp_leases/test_files/debian7.leases
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ lease 10.0.0.10 {
binding state free;
hardware ethernet 60:a4:4c:b5:6a:dd;
uid "\377\000\000\000\002\000\001\000\001\0321\301\300\000#\213\360F\350";
set vendor-class-identifier = "Some Vendor Identifier";
}
lease 10.0.0.15 {
starts 4 2014/01/23 13:40:45;
Expand Down
5 changes: 5 additions & 0 deletions isc_dhcp_leases/test_iscDhcpLeases.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def test_get(self):
self.assertEqual(result[0].hostname, "")
self.assertEqual(result[0].start, datetime(2013, 12, 10, 12, 57, 4))
self.assertEqual(result[0].end, datetime(2013, 12, 10, 13, 7, 4))
self.assertEqual(result[0].sets, {'vendor-class-identifier': 'Some Vendor Identifier'})

leases = IscDhcpLeases("isc_dhcp_leases/test_files/pfsense.leases")
result = leases.get()
Expand Down Expand Up @@ -76,6 +77,7 @@ def test_get(self):
self.assertEqual(result[0].max_life, 864)
self.assertEqual(result[0].last_communication, datetime(2016, 1, 6, 14, 50, 34))
self.assertEqual(result[0].type, Lease6.NON_TEMPORARY)
self.assertEqual(result[0].sets, dict(iana='2001:10:10:0:0:0:0:106', clientduid='0100011cf710a5002722332b34'))

self.assertEqual(result[1].ip, "2001:10:30:ff00::/56")
self.assertEqual(result[1].host_identifier, b"\x00\x00\x00\x00\x00\x01\x00\x01\x1d4L\x00\x00%\x90k\xa14")
Expand All @@ -88,6 +90,9 @@ def test_get(self):
self.assertEqual(result[1].max_life, 864)
self.assertEqual(result[1].last_communication, datetime(2016, 1, 6, 14, 52, 37))
self.assertEqual(result[1].type, Lease6.PREFIX_DELEGATION)
self.assertEqual(result[1].sets, dict(iapd='2001:10:30:ff00:0:0:0:0', pdsize='56',
pdnet='2001:10:30:ff00:0:0:0:0/56',
clientduid='0100011d344c000025906ba134'))

leases = IscDhcpLeases("isc_dhcp_leases/test_files/options.leases")
result = leases.get()
Expand Down

0 comments on commit 96cbb2d

Please sign in to comment.