-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsync.py
134 lines (113 loc) · 4.31 KB
/
sync.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import json
import logging
import os
import yaml
import requests
from http.server import BaseHTTPRequestHandler, HTTPServer
from src.json_log_formatter import JsonFormatter
# configure logging
json_formatter = JsonFormatter()
handler = logging.StreamHandler()
handler.setFormatter(json_formatter)
logger = logging.getLogger('LOKI_RULE_GROUP_CONTROLLER')
logger.setLevel(logging.INFO)
logger.addHandler(handler)
# configure loki endpoint values
LOKI_API_ENDPOINT = os.environ['LOKI_API_ENDPOINT']
LOKI_POST_HEADERS = {"Content-Type": "application/yaml"}
def create_or_update_alerting_rule_group(
rule_namespace,
yaml_rule_group_definition,
):
response = requests.post(
f'{LOKI_API_ENDPOINT}/loki/api/v1/rules/{rule_namespace}',
data=yaml_rule_group_definition,
headers=LOKI_POST_HEADERS
)
return response
def delete_alerting_rule_group(
rule_namespace,
rule_name,
):
response = requests.delete(
f'{LOKI_API_ENDPOINT}/loki/api/v1/rules/{rule_namespace}/{rule_name}'
)
return response
def get_alerting_rules():
response = requests.get(
f'{LOKI_API_ENDPOINT}/loki/api/v1/rules'
)
return yaml.safe_load(response.text)
# Not used, since getting state from the API isn't needed for finalizers
def get_alerting_rules_in_namespace(rule_namespace,):
response = requests.get(
f'{LOKI_API_ENDPOINT}/loki/api/v1/rules/{rule_namespace}'
)
return yaml.safe_load(response.text)[rule_namespace][0]
class LokiRuleGroupHandler(BaseHTTPRequestHandler):
def do_POST(self):
content_length = int(self.headers['Content-Length'])
request_body = self.rfile.read(content_length).decode('utf-8')
request_data = json.loads(request_body)
parent = request_data['parent']
rule_group = yaml.dump(parent.get('spec', {}))
try:
rule_group_namespace = request_data['parent']['spec']['name']
except Exception:
status = 'Degraded'
logger.exception(f'failed to parse request: {request_data}')
if self.path.endswith('/finalize'):
# Handle the finalize hook
try:
response = delete_alerting_rule_group(
rule_name=rule_group_namespace,
rule_namespace=rule_group_namespace,
)
response_data = {
"finalized": response.ok
}
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(response_data).encode('utf-8'))
except Exception:
response_data = {
"finalized": True
}
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(response_data).encode('utf-8'))
logger.warning(
f'failed to delete rule group from request, did it exist?: {request_data}'
)
else:
# Sync the object with the external API
try:
response = create_or_update_alerting_rule_group(
rule_namespace=rule_group_namespace,
yaml_rule_group_definition=rule_group
)
# check if rule group was created
if yaml.safe_load(rule_group) == get_alerting_rules_in_namespace(
rule_namespace=rule_group_namespace
):
status = 'Healthy'
else:
status = 'Progressing'
except Exception:
status = "Degraded"
logger.exception(f'failed to create rule group: {rule_group}')
# Prepare the response for Metacontroller
response_data = {
'status': {
'health': {
'status': status
},
},
}
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(response_data).encode('utf-8'))
HTTPServer(("", 80), LokiRuleGroupHandler).serve_forever()