Skip to content

Commit

Permalink
Update to use Prometheus Pusher context manager, rucio#129
Browse files Browse the repository at this point in the history
  • Loading branch information
voetberg committed Feb 15, 2024
1 parent 86f8be2 commit ef2d176
Show file tree
Hide file tree
Showing 10 changed files with 283 additions and 374 deletions.
44 changes: 17 additions & 27 deletions common/check_expired_dids
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
# Copyright European Organization for Nuclear Research (CERN) 2013
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -8,51 +8,41 @@
# Authors:
# - Vincent Garonne, <[email protected]>, 2013
# - Thomas Beermann, <[email protected]>, 2019
# - Eric Vaandering <[email protected]>, 2020-2021
# - Eric Vaandering <[email protected]>, 2020-2022
# - Maggie Voetberg <[email protected]>, 2024

"""
Probe to check the backlog of expired dids.
"""
from __future__ import print_function

import sys
import traceback
from datetime import datetime
from sqlalchemy import func

from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
from rucio.common.config import config_get
from rucio.db.sqla import models
from rucio.db.sqla.session import get_session
from rucio.db.sqla.util import get_count

from utils.common import probe_metrics
from utils.common import PrometheusPusher


# Exit statuses
OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3

PROM_SERVERS = config_get('monitor', 'prometheus_servers', raise_exception=False, default='')
if PROM_SERVERS != '':
PROM_SERVERS = PROM_SERVERS.split(',')

if __name__ == "__main__":
try:
registry = CollectorRegistry()
session = get_session()
query = session.query(models.DataIdentifier.scope).filter(models.DataIdentifier.expired_at.isnot(None),
models.DataIdentifier.expired_at < datetime.utcnow())
result = get_count(query)
# Possible check against a threshold. If result > max_value then sys.exit(CRITICAL)
probe_metrics.gauge(name='undertaker.expired_dids').set(result)
Gauge('undertaker_expired_dids', '', registry=registry).set(result)

if len(PROM_SERVERS):
for server in PROM_SERVERS:
try:
push_to_gateway(server.strip(), job='check_expired_dids', registry=registry)
except:
continue

print(result)
with PrometheusPusher() as manager:

query = (session.query(func.count(models.DataIdentifier.scope))
.filter(models.DataIdentifier.expired_at.isnot(None),
models.DataIdentifier.expired_at < datetime.utcnow()))
result = query.scalar() or 0
# Possible check against a threshold. If result > max_value then sys.exit(CRITICAL)

manager.gauge('expired_dids.total',
documentation="All expired dids").set(result)

except:
print(traceback.format_exc())
sys.exit(UNKNOWN)
Expand Down
175 changes: 82 additions & 93 deletions common/check_fts_backlog
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
"""
Copyright European Organization for Nuclear Research (CERN) 2013
Expand All @@ -9,27 +9,23 @@
Authors:
- Cedric Serfon, <[email protected]>, 2014-2018
- Mario Lassnig, <[email protected]>, 2015
- Eric Vaandering, <[email protected]>, 2019-2021
- Eric Vaandering, <[email protected]>, 2019-2022
- Thomas Beermann, <[email protected]>, 2019
- Maggie Voetberg, <[email protected]>, 2024
"""
from __future__ import print_function

import os
import sys
from urllib.parse import urlparse

import requests
import urllib3
try:
from urlparse import urlparse
except ImportError:
from urllib.parse import urlparse

from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
from rucio.common.config import config_get, config_get_bool
from rucio.core.distance import update_distances
from rucio.db.sqla.session import BASE, get_session

from utils.common import probe_metrics
from utils.common import PrometheusPusher


OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3

Expand All @@ -40,10 +36,6 @@ if BASE.metadata.schema:
else:
schema = ''

PROM_SERVERS = config_get('monitor', 'prometheus_servers', raise_exception=False, default='')
if PROM_SERVERS != '':
PROM_SERVERS = PROM_SERVERS.split(',')

if __name__ == "__main__":

se_matrix = {}
Expand Down Expand Up @@ -82,87 +74,84 @@ if __name__ == "__main__":
except Exception as error:
UPDATE_DIST = True

registry = CollectorRegistry()
g = Gauge('fts_submitted', '', labelnames=('hostname',), registry=registry)
errmsg = ''
for ftshost in FTSHOSTS.split(','):
print("=== %s ===" % ftshost)
parsed_url = urlparse(ftshost)
scheme, hostname, port = parsed_url.scheme, parsed_url.hostname, parsed_url.port
retvalue = CRITICAL
url = '%s/fts3/ftsmon/overview?dest_se=&source_se=&time_window=1&vo=%s' % (ftshost, VO)
busy_channels = []
busylimit = 5000
for attempt in range(0, 5):
result = None
try:
result = requests.get(url, verify=False, cert=(PROXY, PROXY))
res = result.json()
for channel in res['overview']['items']:
src = channel['source_se']
dst = channel['dest_se']
if (src, dst) not in se_matrix:
se_matrix[(src, dst)] = {'active': 0, 'submitted': 0, 'finished': 0, 'failed': 0,
'transfer_speed': 0, 'mbps_link': 0}
for state in ['submitted', 'active', 'finished', 'failed']:
with PrometheusPusher() as manager:

errmsg = ''
for ftshost in FTSHOSTS.split(','):
print("=== %s ===" % ftshost)
parsed_url = urlparse(ftshost)
scheme, hostname, port = parsed_url.scheme, parsed_url.hostname, parsed_url.port
retvalue = CRITICAL
url = '%s/fts3/ftsmon/overview?dest_se=&source_se=&time_window=1&vo=%s' % (ftshost, VO)
busy_channels = []
busylimit = 5000
for attempt in range(0, 5):
result = None
try:
result = requests.get(url, verify=False, cert=(PROXY, PROXY))
res = result.json()
for channel in res['overview']['items']:
src = channel['source_se']
dst = channel['dest_se']
if (src, dst) not in se_matrix:
se_matrix[(src, dst)] = {'active': 0, 'submitted': 0, 'finished': 0, 'failed': 0,
'transfer_speed': 0, 'mbps_link': 0}
for state in ['submitted', 'active', 'finished', 'failed']:
try:
se_matrix[(src, dst)][state] += channel[state]
except Exception:
pass
try:
se_matrix[(src, dst)][state] += channel[state]
se_matrix[(src, dst)]['transfer_speed'] += channel['current']
se_matrix[(src, dst)]['mbps_link'] += channel['current']
except Exception:
pass
try:
se_matrix[(src, dst)]['transfer_speed'] += channel['current']
se_matrix[(src, dst)]['mbps_link'] += channel['current']
except Exception:
pass
if CHECK_BUSY and 'submitted' in channel and channel['submitted'] >= busylimit:
url_activities = '%s/fts3/ftsmon/config/activities/%s?source_se=%s&dest_se=%s' % (ftshost, VO,
src, dst)
activities = {}
try:
s = requests.get(url_activities, verify=False, cert=(PROXY, PROXY))
for key, val in s.json().items():
activities[key] = val['SUBMITTED']
except Exception as error:
pass
busy_channels.append({'src': src, 'dst': dst, 'submitted': channel['submitted'],
'activities': activities})
summary = res['summary']
hostname = hostname.replace('.', '_')
print('%s : Submitted : %s' % (hostname, summary['submitted']))
print('%s : Active : %s' % (hostname, summary['active']))
print('%s : Staging : %s' % (hostname, summary['staging']))
print('%s : Started : %s' % (hostname, summary['started']))
if busy_channels != []:
print('Busy channels (>%s submitted):' % busylimit)
for bc in busy_channels:
activities_str = ", ".join([("%s: %s" % (key, val)) for key, val in bc['activities'].items()])
print(' %s to %s : %s submitted jobs (%s)' % (bc['src'], bc['dst'], bc['submitted'],
str(activities_str)))
probe_metrics.gauge('fts3.{hostname}.submitted').labels(hostname=hostname).set(summary['submitted']
+ summary['active']
+ summary['staging']
+ summary['started'])

g.labels(**{'hostname': hostname}).set((summary['submitted'] + summary['active'] + summary['staging'] + summary['started']))
retvalue = OK
break
except Exception as error:
retvalue = CRITICAL
if result and result.status_code:
errmsg = 'Error when trying to get info from %s : HTTP status code %s. [%s]' % (
ftshost, str(result.status_code), str(error))
else:
errmsg = 'Error when trying to get info from %s. %s' % (ftshost, str(error))
if retvalue == CRITICAL:
print("All attempts failed. %s" % errmsg)
WORST_RETVALUE = max(retvalue, WORST_RETVALUE)

if len(PROM_SERVERS):
for server in PROM_SERVERS:
try:
push_to_gateway(server.strip(), job='check_fts_backlog', registry=registry)
except:
continue

if CHECK_BUSY and 'submitted' in channel and channel['submitted'] >= busylimit:
url_activities = '%s/fts3/ftsmon/config/activities/%s?source_se=%s&dest_se=%s' % (ftshost, VO, src, dst)
activities = {}
try:
s = requests.get(url_activities, verify=False, cert=(PROXY, PROXY))
for key, val in s.json().items():
activities[key] = val['SUBMITTED']
except Exception as error:
pass
busy_channels.append({'src': src, 'dst': dst, 'submitted': channel['submitted'],
'activities': activities})
summary = res['summary']
hostname = hostname.replace('.', '_')
# If printing these indiv is important, why not monitor them seperately?
print('%s : Submitted : %s' % (hostname, summary['submitted']))
print('%s : Active : %s' % (hostname, summary['active']))
print('%s : Staging : %s' % (hostname, summary['staging']))
print('%s : Started : %s' % (hostname, summary['started']))

if busy_channels != []:
print('Busy channels (>%s submitted):' % busylimit)
for bc in busy_channels:
activities_str = ", ".join([("%s: %s" % (key, val)) for key, val in bc['activities'].items()])
print(' %s to %s : %s submitted jobs (%s)' % (bc['src'], bc['dst'], bc['submitted'],
str(activities_str)))

# Add to metrics
backlog_count = summary['submitted'] + summary['active'] + summary['staging'] + summary['started']
manager.gauge(
"fts_backlog.submitted.{hostname}",
documentation="All submitted, active, staged, or stated in FTS queue").labels(hostname=hostname).set(backlog_count)

retvalue = OK
break
except Exception as error:
retvalue = CRITICAL
if result and result.status_code:
errmsg = 'Error when trying to get info from %s : HTTP status code %s. [%s]' % (
ftshost, str(result.status_code), str(error))
else:
errmsg = 'Error when trying to get info from %s. %s' % (ftshost, str(error))
if retvalue == CRITICAL:
print("All attempts failed. %s" % errmsg)
WORST_RETVALUE = max(retvalue, WORST_RETVALUE)


if not UPDATE_DIST:
sys.exit(WORST_RETVALUE)
Expand Down Expand Up @@ -190,10 +179,10 @@ if __name__ == "__main__":
except:
sys.exit(WORST_RETVALUE)

# Does this not do the same thing as the query? Why the duplicate?
for source_rse, dest_rse in se_matrix:
for source_rse_id in se_map[source_rse]:
for dest_rse_id in se_map[dest_rse]:
# print source_rse_id, dest_rse_id, se_matrix[(source_rse, dest_rse)]
update_distances(src_rse_id=source_rse_id, dest_rse_id=dest_rse_id,
parameters=se_matrix[(source_rse, dest_rse)], session=None)
sys.exit(WORST_RETVALUE)
42 changes: 17 additions & 25 deletions common/check_messages_to_submit
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
# Copyright European Organization for Nuclear Research (CERN) 2013
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -8,19 +8,18 @@
# Authors:
# - Mario Lassnig, <[email protected]>, 2013-2014
# - Thomas Beermann, <[email protected]>, 2019
# - Eric Vaandering, <[email protected]>, 2022
# - Maggie Voetberg, <[email protected]>, 2024

"""
Probe to check the queues of messages to submit by Hermes to the broker
"""
from __future__ import print_function

import sys
from sqlalchemy.sql import text as sql_text

from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
from rucio.common.config import config_get
from rucio.db.sqla.session import BASE, get_session

from utils.common import probe_metrics
from utils.common import PrometheusPusher

# Exit statuses
OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3
Expand All @@ -32,31 +31,24 @@ else:

queue_sql = """SELECT COUNT(*) FROM {schema}messages""".format(schema=schema)

PROM_SERVERS = config_get('monitor', 'prometheus_servers', raise_exception=False, default='')
if PROM_SERVERS != '':
PROM_SERVERS = PROM_SERVERS.split(',')

if __name__ == "__main__":
try:
registry = CollectorRegistry()
session = get_session()
result = session.execute(queue_sql).fetchall()
print('queues.messages %s' % result[0][0])
probe_metrics.gauge(name='queues.messages').set(result[0][0])
Gauge('hermes_queues_messages', '', registry=registry).set(result[0][0])

if len(PROM_SERVERS):
for server in PROM_SERVERS:
try:
push_to_gateway(server.strip(), job='check_messages_to_submit', registry=registry)
except:
continue

if result[0][0] > 100000:
with PrometheusPusher() as manager:
result = session.execute(sql_text(queue_sql)).fetchall()
message_count = result[0][0]
print(f"Messages in queue: {message_count}")

manager.gauge(
"messages_to_submit.queues.messages",
documentation="Messages in queue, to submit").set(message_count)

if message_count > 100000:
sys.exit(WARNING)
elif result[0][0] > 1000000:
elif message_count > 1000000:
sys.exit(CRITICAL)

except Exception as e:
print(f"Error: {e}")
sys.exit(UNKNOWN)
sys.exit(OK)
Loading

0 comments on commit ef2d176

Please sign in to comment.