Skip to content

Commit 223c28f

Browse files
committed
Merge branch 'feature/remote-config' into 'master'
Feature/remote config See merge request grafolean/grafolean-worker-ping!1
2 parents e9d8794 + 9fc66b9 commit 223c28f

File tree

3 files changed

+225
-92
lines changed

3 files changed

+225
-92
lines changed

Pipfile

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,14 @@ url = "https://pypi.org/simple"
44
verify_ssl = true
55

66
[dev-packages]
7+
pylint = "*"
78

89
[packages]
910
requests = "*"
1011
multiping = "*"
12+
ansicolors = "*"
13+
grafoleancollector = "*"
14+
python-dotenv = "*"
1115

1216
[requires]
1317
python_version = "3.6"

Pipfile.lock

Lines changed: 130 additions & 11 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

grafolean-worker-ping.py

Lines changed: 91 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -1,87 +1,97 @@
1-
from collections import defaultdict
2-
from multiping import MultiPing
31
import os
2+
import time
3+
from collections import defaultdict
4+
from colors import color
5+
import logging
6+
from multiping import multi_ping
7+
from pytz import utc
48
import requests
59
import socket
6-
import time
10+
import dotenv
11+
12+
13+
from grafoleancollector import Collector
14+
15+
16+
logging.basicConfig(format='%(asctime)s | %(levelname)s | %(message)s',
17+
datefmt='%Y-%m-%d %H:%M:%S', level=logging.DEBUG)
18+
logging.addLevelName(logging.DEBUG, color("DBG", 7))
19+
logging.addLevelName(logging.INFO, "INF")
20+
logging.addLevelName(logging.WARNING, color('WRN', fg='red'))
21+
logging.addLevelName(logging.ERROR, color('ERR', bg='red'))
22+
log = logging.getLogger("{}.{}".format(__name__, "base"))
23+
24+
25+
def send_results_to_grafolean(backend_url, bot_token, account_id, values):
26+
url = '{}/accounts/{}/values/?b={}'.format(backend_url, account_id, bot_token)
27+
28+
if len(values) == 0:
29+
log.warning("No results available to be sent to Grafolean, skipping.")
30+
return
31+
32+
log.info("Sending results to Grafolean")
33+
try:
34+
r = requests.post(url, json=values)
35+
r.raise_for_status()
36+
log.info("Results sent: {}".format(values))
37+
except:
38+
log.exception("Error sending data to Grafolean")
39+
40+
41+
class PingCollector(Collector):
42+
43+
def jobs(self):
44+
for entity_info in self.fetch_job_configs('ping'):
45+
intervals = list(set([sensor_info["interval"] for sensor_info in entity_info["sensors"]]))
46+
job_info = { **entity_info, "backend_url": self.backend_url, "bot_token": self.bot_token }
47+
job_id = str(entity_info["entity_id"])
48+
yield job_id, intervals, PingCollector.do_ping, job_info
49+
50+
@staticmethod
51+
def do_ping(*args, **job_info):
52+
# filter out only those sensors that are supposed to run at this interval:
53+
affecting_intervals, = args
54+
hostname = job_info["details"]["ipv4"]
55+
cred = job_info["credential_details"]
56+
57+
activated_sensors = [s for s in job_info["sensors"] if s["interval"] in affecting_intervals]
58+
if not activated_sensors:
59+
return
60+
61+
# perform ping:
62+
values = []
63+
addrs = [hostname]
64+
timeout = float(cred.get("timeout", 2.0))
65+
retry = int(cred.get("retry", 0))
66+
n_packets = int(cred.get("n_packets", 3))
67+
sleep_packets = float(cred.get("sleep_packets", 1.0))
68+
output_path_prefix = f'entity.{job_info["entity_id"]}.ping'
69+
n_ok = 0
70+
for i in range(n_packets):
71+
responses, _ = multi_ping(addrs, timeout=timeout, retry=retry)
72+
73+
# save results:
74+
if len(responses) > 0:
75+
values.append({'p': f"{output_path_prefix}.p{i}.ok", 'v': 1.0})
76+
values.append({'p': f"{output_path_prefix}.p{i}.rtt", 'v': list(responses.values())[0]})
77+
n_ok += 1
78+
else:
79+
values.append({'p': f"{output_path_prefix}.p{i}.ok", 'v': 0.0})
80+
time.sleep(sleep_packets)
81+
82+
values.append({'p': f"{output_path_prefix}.success", 'v': float(n_ok) / n_packets})
83+
84+
send_results_to_grafolean(job_info['backend_url'], job_info['bot_token'], job_info['account_id'], values)
785

8-
N_PINGS = 3
9-
10-
# This is copy-pasted from multiping package; the reason is that we need to get MultiPing
11-
# instance, because it holds the IP addresses which correspond to the addresses we wanted
12-
# pinged - and the key in ping results is the IP.
13-
def multi_ping(dest_addrs, timeout, retry=0, ignore_lookup_errors=False):
14-
retry_timeout = float(timeout) / (retry + 1)
15-
16-
mp = MultiPing(dest_addrs, ignore_lookup_errors=ignore_lookup_errors)
17-
18-
results = {}
19-
retry_count = 0
20-
while retry_count <= retry:
21-
# Send a batch of pings
22-
mp.send()
23-
single_results, no_results = mp.receive(retry_timeout)
24-
# Add the results from the last sending of pings to the overall results
25-
results.update(single_results)
26-
if not no_results:
27-
# No addresses left? We are done.
28-
break
29-
retry_count += 1
30-
31-
return results, no_results, mp
32-
33-
def get_addr_for_ip_dict(addrs, mp):
34-
# This is complicated, and a hack. Still... mp (MultiPing instance) holds two lists,
35-
# self._dest_addrs and self._unprocessed_targets. List _unprocessed_targets has the addresses
36-
# that couldn't be resolved. Others were resolved, and _dest_addrs has the IPs in the same
37-
# order as original addresses.
38-
resolved_addrs = [a for a in addrs if a not in mp._unprocessed_targets]
39-
ip_to_addr = {k: v for k, v in zip(mp._dest_addrs, resolved_addrs)}
40-
return ip_to_addr
41-
42-
def do_ping(addrs):
43-
results = defaultdict(list)
44-
mp = None
45-
ip_to_addr = {}
46-
for i in range(N_PINGS):
47-
print(".")
48-
responses, no_responses, mp = multi_ping(addrs, timeout=2, retry=3, ignore_lookup_errors=True)
49-
50-
# Some addresses (like demo.grafolean.com) resolve to multiple IPs, so each call to multi_ping will
51-
# resolve differently - we must find the new IP addresses every time:
52-
ip_to_addr = get_addr_for_ip_dict(addrs, mp)
53-
54-
for no_resp in no_responses:
55-
addr = ip_to_addr.get(no_resp, no_resp)
56-
results[addr].append(None)
57-
for resp, t in responses.items():
58-
addr = ip_to_addr.get(resp, resp)
59-
results[addr].append(t)
60-
61-
if i < N_PINGS - 1:
62-
time.sleep(1)
63-
return dict(results)
64-
65-
def send_results_to_grafolean(base_url, account_id, bot_token, results):
66-
url = '{}/api/accounts/{}/values/?b={}'.format(base_url, account_id, bot_token)
67-
values = []
68-
for ip in results:
69-
for ping_index, ping_time in enumerate(results[ip]):
70-
values.append({
71-
'p': 'ping.{}.{}.success'.format(ip.replace('.', '_'), ping_index),
72-
'v': 0 if ping_time is None else 1,
73-
})
74-
if ping_time is not None:
75-
values.append({
76-
'p': 'ping.{}.{}.rtt'.format(ip.replace('.', '_'), ping_index),
77-
'v': ping_time,
78-
})
79-
print("Sending results to Grafolean")
80-
r = requests.post(url, json=values)
81-
print(r.text)
82-
r.raise_for_status()
8386

8487
if __name__ == "__main__":
85-
addrs = ["8.8.8.8", "youtube.com", "127.0.0.1", "demo.grafolean.com", "grafolean.com", "whateverdoeesndfexist.com"]
86-
results = do_ping(addrs)
87-
send_results_to_grafolean(os.environ.get('BACKEND_URL'), 1, os.environ.get('BOT_TOKEN'), results)
88+
dotenv.load_dotenv()
89+
90+
backend_url = os.environ.get('BACKEND_URL')
91+
bot_token = os.environ.get('BOT_TOKEN')
92+
if not backend_url or not bot_token:
93+
raise Exception("Please specify BACKEND_URL and BOT_TOKEN env vars.")
94+
jobs_refresh_interval = int(os.environ.get('JOBS_REFRESH_INTERVAL', 120))
95+
96+
c = PingCollector(backend_url, bot_token, jobs_refresh_interval)
97+
c.execute()

0 commit comments

Comments
 (0)