-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathprometheus-ecs-sd.py
executable file
·187 lines (167 loc) · 8.39 KB
/
prometheus-ecs-sd.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
#!/usr/bin/python
import argparse
import signal
import boto3
import logging
from aiohttp import web
import asyncio
import yaml
import sys
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def parse_args():
parser = argparse.ArgumentParser(prog='prometheus-ecs-sd', description='Prometheus file discovery for AWS ECS')
parser.add_argument('-f', '--file', type=str, default='/tmp/ecs_file_sd.yml', help='File to write tasks (default: /tmp/ecs_file_sd.yml)')
parser.add_argument('-c', '--cluster', type=str, default='', help='Return metrics only for this Cluster name (default: all)')
parser.add_argument('-i', '--interval', type=int, default=60, help='Interval to discover ECS tasks, seconds (default: 60)')
parser.add_argument('-l', '--log', choices=['debug', 'info', 'warn'], default='info', help='Logging level (default: info)')
parser.add_argument('-p', '--port', type=int, default=8080, help='Port to serve /metrics (default: 8080)')
args = parser.parse_args()
logger.setLevel(getattr(logging, args.log.upper()))
return args
class Discoverer:
def __init__(self, file, cluster):
self.file = file
self.cluster = cluster
self.tasks = {} # ecs tasks cache
self.hosts = {} # ec2 container instances cache
try:
self.ecs = boto3.client('ecs')
self.ec2 = boto3.client('ec2')
self.ecs.list_clusters() # check creds on start
except Exception as e:
sys.exit(e)
async def loop(self, interval):
signal.signal(signal.SIGINT, self.signal_handler)
el = asyncio.get_event_loop()
i = 0
while True:
try:
await asyncio.wait_for(el.run_in_executor(None, self.discover), timeout=interval)
except asyncio.exceptions.TimeoutError:
logger.error(f'Timeout while reading ECS Tasks! Try to increase --interval')
except Exception:
logger.error(f'Read tasks error:', exc_info=True)
await asyncio.sleep(interval)
i += 1
# drop caches
if i > 1440:
i = 0
self.tasks = {}
def discover(self):
targets = []
tasks = 0
for cluster in self.ecs.list_clusters().get('clusterArns', []):
if self.cluster and cluster.split('/')[-1] != self.cluster:
continue
for page in self.ecs.get_paginator('list_tasks').paginate(cluster=cluster, launchType='EC2'):
for arn in page.get('taskArns', []):
targets += self.check_task(cluster=cluster, arn=arn)
tasks += 1
logger.info(f"Discovered {len(targets)} targets from {tasks} tasks")
with open(self.file, 'w') as f:
yaml.dump(targets, f)
def check_task(self, cluster, arn):
if arn not in self.tasks:
task = self.ecs.describe_tasks(cluster=cluster, tasks=[arn])['tasks'][0]
td = self.ecs.describe_task_definition(taskDefinition=task['taskDefinitionArn'])['taskDefinition']
if 'containerInstanceArn' not in task: # not yet mapped, skip caching
return []
ip = self.get_host_ip(cluster, task['containerInstanceArn'])
sd = []
for container in td['containerDefinitions']:
scrapes = container.get('dockerLabels', {}).get('PROMETHEUS_SCRAPES')
if scrapes:
labels = self.get_labels(container.get('dockerLabels', {}).get('PROMETHEUS_LABELS'))
labels['container_name'] = container['name']
labels['task_name'] = td['family']
labels['task_revision'] = td['revision']
tc = [x for x in task['containers'] if x['name'] == container['name']][0]
labels['container_arn'] = tc.get('containerArn', '')
labels['__container_image'] = tc.get('image', '')
labels['__task_group'] = task.get('group', '')
labels['__container_runtime_id'] = tc.get('runtimeId', '')
labels['instance_id'] = self.hosts[task['containerInstanceArn']]['id']
for port in scrapes.split(','):
tmp = labels.copy()
if '/' in port:
port, path = port.split('/', maxsplit=1)
tmp['__metrics_path__'] = f'/{path}'
port = self.get_mapped_port(int(port), container, task['containers'])
if port is None: # not yet mapped, skip caching
return []
sd.append({
'targets': [f'{ip}:{port}'],
'labels': tmp
})
self.tasks[arn] = sd
logger.debug(f'Got task {arn} obj: {self.tasks[arn]}')
return self.tasks[arn]
def get_host_ip(self, cluster, arn):
if arn not in self.hosts:
id = self.ecs.describe_container_instances(cluster=cluster, containerInstances=[arn])['containerInstances'][0]['ec2InstanceId']
self.hosts[arn] = {
'id': id,
'ip': self.ec2.describe_instances(InstanceIds=[id])['Reservations'][0]['Instances'][0]['PrivateIpAddress']
}
logger.debug(f'Got host {arn} IP: {self.hosts[arn]["ip"]}')
return self.hosts[arn]["ip"]
# "__scheme__=https,skip_15s=true" => {"__scheme__": "https", "skip_15s": "true"}
@staticmethod
def get_labels(str):
if not str:
return {}
try:
return dict(x.split('=', maxsplit=1) for x in str.split(','))
except:
logger.warning(f'Unable to parse Labels: {str}')
# find host 'port' mapping of container 'definition' in running 'containers'
@staticmethod
def get_mapped_port(port, definition, containers):
portmap = [x for x in definition.get('portMappings', {}) if x['containerPort']==port]
if not portmap:
return port # hostNet
if portmap[0]['hostPort'] == 0: # dynamic host ports
for container in containers:
if container['name'] == definition['name']:
if 'networkBindings' not in container:
logger.info(f'Container {container["name"]} is not yet mapped to host port, skipping')
return None
for bind in container['networkBindings']:
if bind['containerPort'] == port:
return bind['hostPort']
else:
return portmap[0]['hostPort'] # mapped port
@staticmethod
def signal_handler(num, frame):
sys.exit(0)
class Metrics:
def __init__(self, cluster):
self.cluster = cluster
self.ecs = boto3.client('ecs')
async def handler(self, request):
res = ''
for cluster in self.ecs.list_clusters().get('clusterArns', []):
if self.cluster and cluster.split('/')[-1] != self.cluster:
continue
for page in self.ecs.get_paginator('list_services').paginate(cluster=cluster):
for arn in page.get('serviceArns', []):
service = self.ecs.describe_services(cluster=cluster, services=[arn])['services'][0]
res += f'ecs_service_desired_tasks{{service="{service["serviceName"]}"}} {service["desiredCount"]}\n'
res += f'ecs_service_running_tasks{{service="{service["serviceName"]}"}} {service["runningCount"]}\n'
res += f'ecs_service_pending_tasks{{service="{service["serviceName"]}"}} {service["pendingCount"]}\n'
return web.Response(text=res)
async def start_background_tasks(app):
app['discovery'] = asyncio.create_task(Discoverer(app['args'].file, app['args'].cluster).loop(app['args'].interval))
async def cleanup_background_tasks(app):
app['discovery'].cancel()
await app['discovery']
if __name__ == "__main__":
args = parse_args()
logger.debug(f"Starting with args: {args}")
app = web.Application()
app['args'] = args
app.router.add_get("/metrics", Metrics(args.cluster).handler)
app.on_startup.append(start_background_tasks)
app.on_cleanup.append(cleanup_background_tasks)
web.run_app(app, port=args.port, access_log=logger)