Skip to content

Commit c85eb60

Browse files
committed
Implement NetFlow bot
1 parent a5ea104 commit c85eb60

File tree

3 files changed

+99
-5
lines changed

3 files changed

+99
-5
lines changed

lookup.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
REDIS_HASH_TRAFFIC_PER_PROTOCOL = "tpp"
2+
13
PROTOCOLS = {
24
0: "HOPOPT",
35
1: "ICMP",

netflowbot.py

Lines changed: 91 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@
1010
from colors import color
1111
import dotenv
1212
import redis
13+
import requests
1314

14-
from grafoleancollector import Collector
15+
from grafoleancollector import Collector, send_results_to_grafolean
1516

17+
from lookup import PROTOCOLS, REDIS_HASH_TRAFFIC_PER_PROTOCOL
1618

1719
logging.basicConfig(format='%(asctime)s | %(levelname)s | %(message)s',
1820
datefmt='%Y-%m-%d %H:%M:%S', level=logging.DEBUG)
@@ -27,8 +29,95 @@
2729
r = redis.Redis(host=REDIS_HOST)
2830

2931

32+
REDIS_PREFIX = 'netflow'
33+
34+
3035
class NetFlowBot(Collector):
31-
pass
36+
37+
def jobs(self):
38+
# for entity_info in self.fetch_job_configs('netflow'):
39+
# for sensor_info in entity_info["sensors"]:
40+
# # The job could be triggered at different intervals - it is triggered when at least one of the specified intervals matches.
41+
# intervals = [sensor_info["interval"]]
42+
# # `job_id` must be a unique, permanent identifier of a job. When the job_id changes, the job will be rescheduled - so make sure it is something that
43+
# # identifies this particular job.
44+
# job_id = str(sensor_info["sensor_id"])
45+
# # Prepare parameters that will be passed to `perform_job()` whenever the job is being run:
46+
# # (don't forget to pass backend_url and bot_token!)
47+
# job_params = { **sensor_info, "entity_info": entity_info, "backend_url": self.backend_url, "bot_token": self.bot_token }
48+
# yield job_id, intervals, NetFlowBot.perform_job, job_params
49+
50+
# mock the jobs for now: (until frontend is done)
51+
job_id = 'traffic_per_protocol'
52+
intervals = [60]
53+
job_params = {
54+
"entity_info": {
55+
"account_id": 129104112,
56+
"entity_id": 123,
57+
},
58+
"backend_url": self.backend_url,
59+
"bot_token": self.bot_token,
60+
}
61+
yield job_id, intervals, NetFlowBot.perform_job, job_params
62+
63+
# This method is called whenever the job needs to be done. It gets the parameters and performs fetching of data.
64+
@staticmethod
65+
def perform_job(*args, **job_params):
66+
traffic_per_protocol = r.hgetall(f'{REDIS_PREFIX}_{REDIS_HASH_TRAFFIC_PER_PROTOCOL}')
67+
entity_info = job_params["entity_info"]
68+
values = []
69+
now = time.time()
70+
for protocol, traffic_counter in traffic_per_protocol.items():
71+
output_path = f'entity.{entity_info["entity_id"]}.netflow.traffic_per_protocol.{protocol.decode("utf-8")}'
72+
73+
# since we are getting the counters, convert them to values:
74+
dv, dt = convert_counter_to_value(f'{REDIS_PREFIX}_counter_{output_path}', traffic_counter, now)
75+
if dv is None:
76+
continue
77+
values.append({
78+
'p': output_path,
79+
'v': dv / dt,
80+
})
81+
82+
if not values:
83+
log.warning("No values found to be sent to Grafolean")
84+
return
85+
86+
# send the data to Grafolean:
87+
send_results_to_grafolean(
88+
job_params['backend_url'],
89+
job_params['bot_token'],
90+
job_params['entity_info']['account_id'],
91+
values,
92+
)
93+
94+
95+
def _get_previous_counter_value(counter_ident):
96+
prev_value = r.hgetall(counter_ident)
97+
if not prev_value: # empty dict
98+
return None, None
99+
return int(prev_value[b'v']), float(prev_value[b't'])
100+
101+
102+
def _save_current_counter_value(new_value, now, counter_ident):
103+
r.hmset(counter_ident, {b'v': new_value, b't': now})
104+
105+
106+
def convert_counter_to_value(counter_ident, new_value, now):
107+
old_value, t = _get_previous_counter_value(counter_ident)
108+
new_value = int(float(new_value))
109+
_save_current_counter_value(new_value, now, counter_ident)
110+
if old_value is None:
111+
# no previous counter, can't calculate value:
112+
log.debug(f"Counter {counter_ident} has no previous value.")
113+
return None, None
114+
if new_value < old_value:
115+
# new counter value is lower than the old one, probably overflow: (or reset)
116+
log.warning(f"Counter overflow detected for counter {counter_ident}, discarding value - if this happens often, consider decreasing polling interval.")
117+
return None, None
118+
dt = now - t
119+
dv = new_value - old_value
120+
return dv, dt
32121

33122

34123
def wait_for_grafolean(backend_url):

netflowcollector.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
sys.path.append(os.path.dirname(os.path.realpath(__file__)) + '/pynetflow')
1717
from pynetflow.main import get_export_packets
1818

19-
from lookup import PROTOCOLS
19+
from lookup import PROTOCOLS, REDIS_HASH_TRAFFIC_PER_PROTOCOL
2020

2121

2222
logging.basicConfig(format='%(asctime)s | %(levelname)s | %(message)s',
@@ -32,6 +32,9 @@
3232
r = redis.Redis(host=REDIS_HOST)
3333

3434

35+
REDIS_PREFIX = 'netflow'
36+
37+
3538
def process_netflow(netflow_port):
3639
for ts, client, export in get_export_packets('0.0.0.0', NETFLOW_PORT):
3740
data = defaultdict(int)
@@ -40,10 +43,10 @@ def process_netflow(netflow_port):
4043
in_bytes = flow.data['IN_BYTES']
4144

4245
protocol_str = PROTOCOLS.get(protocol, f'?{protocol}')
43-
data[f'protocol_{protocol_str}_traffic'] += in_bytes
46+
data[protocol_str] += in_bytes
4447

4548
for k, v in data.items():
46-
r.incrby(k, v)
49+
r.hincrby(f'{REDIS_PREFIX}_{REDIS_HASH_TRAFFIC_PER_PROTOCOL}', k, v)
4750

4851

4952
if __name__ == "__main__":

0 commit comments

Comments
 (0)