This repository has been archived by the owner on Feb 15, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
high_volume_subnets_single_customer.py
executable file
·183 lines (156 loc) · 6.66 KB
/
high_volume_subnets_single_customer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
#!/usr/bin/env python3
import ipaddress
import datetime
from confluent_kafka import Consumer
from ssl import get_default_verify_paths
# The following needs to be available in PYTHONPATH
# https://github.com/bwNetFlow/protobuf
import flow_messages_enriched_pb2 as api
# example output:
"""
2019-09-10 15:04:07.913857: 192.168.24.87/32 averaged 815Mbps (of 6024Mbps total)
"""
# skip to line 129 for actual operation, the following classes are implementing the counting
class TrieNode:
def __init__(self, version=6, root=False):
self.lnode = None
self.rnode = None
self.depth = 0 # prefix len
self.content = 0 # should be a network address of correct afi
self.volume = 0.0 # bit/s this net has seen
self.__root = root
self.version = version
if self.version == 4:
self.maxlen = 32
elif self.version == 6:
self.maxlen = 128
else:
raise NotImplementedError
def __getitem__(self, bit):
if bit == 0:
return self.lnode
elif bit == 1:
return self.rnode
else:
raise IndexError
def __setitem__(self, bit, node):
if bit == 0:
self.lnode = node
elif bit == 1:
self.rnode = node
else:
raise IndexError
def __getattribute__(self, name):
if name in ["insert"]:
# safeguards insertions on non-root nodes
if self.__root:
return object.__getattribute__(self, name)
else:
raise AttributeError("Method '{}' is only valid on the root node.".format(name))
else:
# all other methods behave normally
return object.__getattribute__(self, name)
def netaddr(self):
return "{}/{}".format(ipaddress.ip_address(self.content), self.depth)
def insert(self, addr, volume):
current = self
content = 0
for i in range(self.maxlen+1): # descend the trie, bit by bit
current.depth = i
current.volume += volume
current.content = content
if i < self.maxlen:
bit = (addr >> (self.maxlen-1-i)) & 1
if not current[bit]:
current[bit] = TrieNode(version=self.version)
current = current[bit]
content = content | (bit<<(self.maxlen-1-i))
def get_prefixes(self, limit):
# end of recursion, a leaf
if not self.lnode and not self.rnode:
if self.volume > limit:
return set([self]) # leafs may return themselves
else:
return set() # or -- more likely -- an empty set
# start of recursion, for both children, if applicable
prefixes = set()
if self.lnode:
prefixes.update(self.lnode.get_prefixes(limit))
if self.rnode:
prefixes.update(self.rnode.get_prefixes(limit))
# if the aggregation of both children reaches the limit, return self
if not prefixes and self.volume > limit:
prefixes.add(self)
return prefixes
def __str__(self):
""" prettyprint self """
content = self.netaddr()
lnode = "✔" if self.lnode else "✗"
rnode = "✔" if self.rnode else "✗"
volume = int(self.volume)
return f"<TrieNode {content} ({volume}Mbps) [{lnode}, {rnode}]>"
def __repr__(self):
return str(self)
class BitCounter:
""" A class for counting bits by IP prefix and managing the tries. """
def __init__(self):
self.ipv4_trie = TrieNode(version=4, root=True)
self.ipv6_trie = TrieNode(version=6, root=True)
def insert(self, flowmsg):
if flowmsg.Etype == 0x0800: # ipv4
self.ipv4_trie.insert(int.from_bytes(flowmsg.DstAddr, "big"), int(flowmsg.Bytes))
elif flowmsg.Etype == 0x86dd: # ipv6
self.ipv6_trie.insert(int.from_bytes(flowmsg.DstAddr, "big"), int(flowmsg.Bytes))
else:
raise NotImplementedError
def get_total(self):
return self.ipv4_trie.volume + self.ipv6_trie.volume
def get_prefixes(self, limit):
return self.ipv4_trie.get_prefixes(limit), self.ipv6_trie.get_prefixes(limit)
if __name__ == "__main__":
# usernames are prefixed with the customer id
USERNAME = "001-johndoe"
PASSWORD = "ASECUREPASSWORDCONAiNSNUMBERS,2"
# consumer group names are prefixed with the username
GROUP = USERNAME+"-subnet_volume-dev"
# boilerplate for initializing a Kafka consumer
consumer = Consumer(
{
"bootstrap.servers": "kafka01.example.com:9093",
"group.id": GROUP,
"security.protocol": "sasl_ssl",
"ssl.ca.location": get_default_verify_paths().cafile,
"sasl.mechanisms": "PLAIN",
"sasl.username": USERNAME,
"sasl.password": PASSWORD,
}
)
# the name of the topic is usually the customer id plus some description
# in this case, it's the default customer-specific stream
consumer.subscribe(['001-flows'])
# the following limit is the threshold from which subnets will be logged
# this obviously depends on the interval configured in line 163
# alert limit: xxx MB
VOLUME_ALERT_LIMIT = 100 * 1024**2
try:
while True:
counter = BitCounter() # initialize a new counter
# the following line determines the interval in which bytes are counted
finish_time = datetime.datetime.now() + datetime.timedelta(minutes=1)
while datetime.datetime.now() < finish_time:
raw = consumer.poll() # this fetches a flow message from Kafka
if raw.error():
print(raw.error())
continue # errors are ignored in this example
# parse the received message using protobuf
flowmsg = api.FlowMessage()
flowmsg.ParseFromString(raw.value())
# the following filters for flows which are of interest:
# the flow is incoming AND from the peer in question
if flowmsg.FlowDirection == 0 and "DFN" in flowmsg.SrcIfDesc:
counter.insert(flowmsg) # this function inserts the flow volume in a prefix trie
v4, v6 = counter.get_prefixes(VOLUME_ALERT_LIMIT) # this function receives all highest nodes exceeding the limit...
for p in v4 | v6: # ... and we print them all for now
print(f"{finish_time}: {p.netaddr()} averaged {int(p.volume/1024**2)}Mbps (of {int(counter.get_total()/1024**2)}Mbps total)")
except KeyboardInterrupt:
consumer.close() # exit gracefully