This repository has been archived by the owner on Mar 9, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsync.py
118 lines (89 loc) · 3.54 KB
/
sync.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import logging
import os
import time
from http.server import BaseHTTPRequestHandler, HTTPServer
import requests
import schedule
# Logging module configuration
logging.basicConfig(format="%(asctime)s - %(message)s", level=logging.INFO)
# VictoriaMetrics configuration
victoriametrics_url = os.getenv("VICTORIAMETRICS_URL", "http://localhost:8428/write")
measurement_name = os.getenv("MEASUREMENT_NAME", "default_measurement")
instance_name = os.getenv("INSTANCE_NAME", "127.0.0.1:80")
# HTTP endpoint configuration
http_endpoint = os.getenv("HTTP_ENDPOINT", "http://localhost/status")
# Variable to keep track of the timestamp of the last successful job
last_successful_job_timestamp = time.time()
# Function to download JSON from the HTTP endpoint
def download_json():
try:
response = requests.get(http_endpoint)
response.raise_for_status()
if response.status_code == 200:
return response.json()
else:
logging.error(f"Error downloading JSON: {response.status_code}")
return None
except requests.exceptions.RequestException as e:
logging.error(f"Connection error: {e}")
return None
# Function to write data to VictoriaMetrics
def write_to_victoriametrics(data):
if data.get("Cnt", 0) == 0:
logging.info("'Cnt' value is zero. No write operation performed.")
return
# This is not a metric
del data["Mac"]
try:
headers = {
"Content-Type": "application/x-www-form-urlencoded",
}
payload = f"{measurement_name},instance={instance_name},job={measurement_name} {','.join([f'{key.lower()}={value}' for key, value in data.items()])}\n"
response = requests.post(
victoriametrics_url,
headers=headers,
data=payload,
)
response.raise_for_status()
# Update the timestamp of the last successful job
global last_successful_job_timestamp
last_successful_job_timestamp = time.time()
logging.info("Data successfully written to VictoriaMetrics.")
except requests.exceptions.RequestException as e:
logging.error(f"Error writing data to VictoriaMetrics: {e}")
# Function to sync data
def job():
logging.info("Downloading JSON with data...")
json_data = download_json()
if json_data:
write_to_victoriametrics(json_data)
else:
logging.error("Error in downloading JSON or no data available.")
# Schedule the job function to run
schedule.every(1).minutes.do(job)
# HTTP handler for health check
class HealthHandler(BaseHTTPRequestHandler):
def do_GET(self):
global last_successful_job_timestamp
current_time = time.time()
if current_time - last_successful_job_timestamp <= 300:
self.send_response(200)
self.send_header("Content-type", "text/plain")
self.end_headers()
self.wfile.write(bytes("OK", "utf8"))
else:
self.send_response(500)
self.send_header("Content-type", "text/plain")
self.end_headers()
self.wfile.write(
bytes("Error: Last successful job is more than 5 minutes old", "utf8")
)
# Set up the HTTP server
port = 8080
http_server = HTTPServer(("localhost", port), HealthHandler)
logging.info(f"Health check server started on port {port}")
# Main loop
while True:
schedule.run_pending()
time.sleep(1)
http_server.handle_request()