From 40aa162cce09c5e2f0ec7e912b4b50c64b8dc6c0 Mon Sep 17 00:00:00 2001 From: Eric Vaandering Date: Thu, 24 Mar 2022 15:03:29 -0500 Subject: [PATCH 1/3] Move probes in common over to context manager and python3 --- common/check_expired_dids | 45 ++++--- common/check_fts_backlog | 177 ++++++++++++++-------------- common/check_messages_to_submit | 33 ++---- common/check_new_dids | 37 +++--- common/check_stuck_rules | 47 ++++---- common/check_transfer_queues_status | 48 ++++---- common/check_unevaluated_dids | 24 ++-- common/check_unlocked_replicas | 43 +++---- common/check_updated_dids | 33 ++---- 9 files changed, 218 insertions(+), 269 deletions(-) diff --git a/common/check_expired_dids b/common/check_expired_dids index 93577d3d..218f16a3 100755 --- a/common/check_expired_dids +++ b/common/check_expired_dids @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # Copyright European Organization for Nuclear Research (CERN) 2013 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -8,7 +8,7 @@ # Authors: # - Vincent Garonne, , 2013 # - Thomas Beermann, , 2019 -# - Eric Vaandering , 2020-2021 +# - Eric Vaandering , 2020-2022 """ Probe to check the backlog of expired dids. @@ -17,42 +17,37 @@ from __future__ import print_function import sys import traceback -from datetime import datetime -from prometheus_client import CollectorRegistry, Gauge, push_to_gateway -from rucio.common.config import config_get +from datetime import datetime +from prometheus_client import CollectorRegistry, Gauge from rucio.core import monitor from rucio.db.sqla import models from rucio.db.sqla.session import get_session from rucio.db.sqla.util import get_count +from utils import common + +PrometheusPusher = common.PrometheusPusher + # Exit statuses OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3 -PROM_SERVERS = config_get('monitor', 'prometheus_servers', raise_exception=False, default='') -if PROM_SERVERS != '': - PROM_SERVERS = PROM_SERVERS.split(',') - if __name__ == "__main__": try: registry = CollectorRegistry() session = get_session() - query = session.query(models.DataIdentifier.scope).filter(models.DataIdentifier.expired_at.isnot(None), - models.DataIdentifier.expired_at < datetime.utcnow()) - result = get_count(query) - # Possible check against a threshold. If result > max_value then sys.exit(CRITICAL) - - monitor.record_gauge('undertaker.expired_dids', value=result) - Gauge('undertaker_expired_dids', '', registry=registry).set(result) - - if len(PROM_SERVERS): - for server in PROM_SERVERS: - try: - push_to_gateway(server.strip(), job='check_expired_dids', registry=registry) - except: - continue - - print(result) + with PrometheusPusher(registry, job_name='check_expired_dids') as prometheus_config: + prefix: str = prometheus_config['prefix'] + query = (session.query(models.DataIdentifier.scope) + .filter(models.DataIdentifier.expired_at.isnot(None), + models.DataIdentifier.expired_at < datetime.utcnow())) + result = get_count(query) + # Possible check against a threshold. If result > max_value then sys.exit(CRITICAL) + + monitor.record_gauge('undertaker.expired_dids', value=result) + Gauge(prefix + 'undertaker_expired_dids', '', registry=registry).set(result) + + print(result) except: print(traceback.format_exc()) sys.exit(UNKNOWN) diff --git a/common/check_fts_backlog b/common/check_fts_backlog index 451282c0..8d254acb 100755 --- a/common/check_fts_backlog +++ b/common/check_fts_backlog @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 """ Copyright European Organization for Nuclear Research (CERN) 2013 @@ -9,28 +9,26 @@ Authors: - Cedric Serfon, , 2014-2018 - Mario Lassnig, , 2015 - - Eric Vaandering, , 2019-2021 + - Eric Vaandering, , 2019-2022 - Thomas Beermann, , 2019 """ -from __future__ import print_function + import os import sys -import urllib3 -try: - from urlparse import urlparse -except ImportError: - from urllib.parse import urlparse +from urllib.parse import urlparse -from prometheus_client import CollectorRegistry, Gauge, push_to_gateway import requests - +import urllib3 +from prometheus_client import CollectorRegistry, Gauge from rucio.common.config import config_get, config_get_bool from rucio.core import monitor - from rucio.core.distance import update_distances - from rucio.db.sqla.session import BASE, get_session +from utils import common + +PrometheusPusher = common.PrometheusPusher + OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3 urllib3.disable_warnings() @@ -40,10 +38,6 @@ if BASE.metadata.schema: else: schema = '' -PROM_SERVERS = config_get('monitor', 'prometheus_servers', raise_exception=False, default='') -if PROM_SERVERS != '': - PROM_SERVERS = PROM_SERVERS.split(',') - if __name__ == "__main__": se_matrix = {} @@ -83,84 +77,85 @@ if __name__ == "__main__": UPDATE_DIST = True registry = CollectorRegistry() - g = Gauge('fts_submitted', '', labelnames=('hostname',), registry=registry) - errmsg = '' - for ftshost in FTSHOSTS.split(','): - print("=== %s ===" % ftshost) - parsed_url = urlparse(ftshost) - scheme, hostname, port = parsed_url.scheme, parsed_url.hostname, parsed_url.port - retvalue = CRITICAL - url = '%s/fts3/ftsmon/overview?dest_se=&source_se=&time_window=1&vo=%s' % (ftshost, VO) - busy_channels = [] - busylimit = 5000 - for attempt in range(0, 5): - result = None - try: - result = requests.get(url, verify=False, cert=(PROXY, PROXY)) - res = result.json() - for channel in res['overview']['items']: - src = channel['source_se'] - dst = channel['dest_se'] - if (src, dst) not in se_matrix: - se_matrix[(src, dst)] = {'active': 0, 'submitted': 0, 'finished': 0, 'failed': 0, - 'transfer_speed': 0, 'mbps_link': 0} - for state in ['submitted', 'active', 'finished', 'failed']: + with PrometheusPusher(registry, job_name='check_fts_backlog') as prometheus_config: + prefix: str = prometheus_config['prefix'] + extra_prom_labels = prometheus_config['labels'] + labelnames = ['hostname'] + labelnames.extend(extra_prom_labels.keys()) + g = Gauge(prefix + 'fts_submitted', '', labelnames=labelnames, registry=registry) + + errmsg = '' + for ftshost in FTSHOSTS.split(','): + print("=== %s ===" % ftshost) + parsed_url = urlparse(ftshost) + scheme, hostname, port = parsed_url.scheme, parsed_url.hostname, parsed_url.port + retvalue = CRITICAL + url = '%s/fts3/ftsmon/overview?dest_se=&source_se=&time_window=1&vo=%s' % (ftshost, VO) + busy_channels = [] + busylimit = 5000 + for attempt in range(0, 5): + result = None + try: + result = requests.get(url, verify=False, cert=(PROXY, PROXY)) + res = result.json() + for channel in res['overview']['items']: + src = channel['source_se'] + dst = channel['dest_se'] + if (src, dst) not in se_matrix: + se_matrix[(src, dst)] = {'active': 0, 'submitted': 0, 'finished': 0, 'failed': 0, + 'transfer_speed': 0, 'mbps_link': 0} + for state in ['submitted', 'active', 'finished', 'failed']: + try: + se_matrix[(src, dst)][state] += channel[state] + except Exception: + pass try: - se_matrix[(src, dst)][state] += channel[state] + se_matrix[(src, dst)]['transfer_speed'] += channel['current'] + se_matrix[(src, dst)]['mbps_link'] += channel['current'] except Exception: pass - try: - se_matrix[(src, dst)]['transfer_speed'] += channel['current'] - se_matrix[(src, dst)]['mbps_link'] += channel['current'] - except Exception: - pass - if CHECK_BUSY and 'submitted' in channel and channel['submitted'] >= busylimit: - url_activities = '%s/fts3/ftsmon/config/activities/%s?source_se=%s&dest_se=%s' % (ftshost, VO, - src, dst) - activities = {} - try: - s = requests.get(url_activities, verify=False, cert=(PROXY, PROXY)) - for key, val in s.json().items(): - activities[key] = val['SUBMITTED'] - except Exception as error: - pass - busy_channels.append({'src': src, 'dst': dst, 'submitted': channel['submitted'], - 'activities': activities}) - summary = res['summary'] - hostname = hostname.replace('.', '_') - print('%s : Submitted : %s' % (hostname, summary['submitted'])) - print('%s : Active : %s' % (hostname, summary['active'])) - print('%s : Staging : %s' % (hostname, summary['staging'])) - print('%s : Started : %s' % (hostname, summary['started'])) - if busy_channels != []: - print('Busy channels (>%s submitted):' % busylimit) - for bc in busy_channels: - activities_str = ", ".join([("%s: %s" % (key, val)) for key, val in bc['activities'].items()]) - print(' %s to %s : %s submitted jobs (%s)' % (bc['src'], bc['dst'], bc['submitted'], - str(activities_str))) - monitor.record_gauge('fts3.%s.submitted' % hostname, - value=(summary['submitted'] + summary['active'] - + summary['staging'] + summary['started'])) - g.labels(**{'hostname': hostname}).set((summary['submitted'] + summary['active'] + summary['staging'] + summary['started'])) - retvalue = OK - break - except Exception as error: - retvalue = CRITICAL - if result and result.status_code: - errmsg = 'Error when trying to get info from %s : HTTP status code %s. [%s]' % ( - ftshost, str(result.status_code), str(error)) - else: - errmsg = 'Error when trying to get info from %s. %s' % (ftshost, str(error)) - if retvalue == CRITICAL: - print("All attempts failed. %s" % errmsg) - WORST_RETVALUE = max(retvalue, WORST_RETVALUE) - - if len(PROM_SERVERS): - for server in PROM_SERVERS: - try: - push_to_gateway(server.strip(), job='check_fts_backlog', registry=registry) - except: - continue + if CHECK_BUSY and 'submitted' in channel and channel['submitted'] >= busylimit: + url_activities = '%s/fts3/ftsmon/config/activities/%s?source_se=%s&dest_se=%s' % ( + ftshost, VO, + src, dst) + activities = {} + try: + s = requests.get(url_activities, verify=False, cert=(PROXY, PROXY)) + for key, val in s.json().items(): + activities[key] = val['SUBMITTED'] + except Exception as error: + pass + busy_channels.append({'src': src, 'dst': dst, 'submitted': channel['submitted'], + 'activities': activities}) + summary = res['summary'] + hostname = hostname.replace('.', '_') + print('%s : Submitted : %s' % (hostname, summary['submitted'])) + print('%s : Active : %s' % (hostname, summary['active'])) + print('%s : Staging : %s' % (hostname, summary['staging'])) + print('%s : Started : %s' % (hostname, summary['started'])) + if busy_channels != []: + print('Busy channels (>%s submitted):' % busylimit) + for bc in busy_channels: + activities_str = ", ".join([("%s: %s" % (key, val)) for key, val in bc['activities'].items()]) + print(' %s to %s : %s submitted jobs (%s)' % (bc['src'], bc['dst'], bc['submitted'], + str(activities_str))) + monitor.record_gauge('fts3.%s.submitted' % hostname, + value=(summary['submitted'] + summary['active'] + + summary['staging'] + summary['started'])) + g.labels(**{'hostname': hostname}).set( + (summary['submitted'] + summary['active'] + summary['staging'] + summary['started'])) + retvalue = OK + break + except Exception as error: + retvalue = CRITICAL + if result and result.status_code: + errmsg = 'Error when trying to get info from %s : HTTP status code %s. [%s]' % ( + ftshost, str(result.status_code), str(error)) + else: + errmsg = 'Error when trying to get info from %s. %s' % (ftshost, str(error)) + if retvalue == CRITICAL: + print("All attempts failed. %s" % errmsg) + WORST_RETVALUE = max(retvalue, WORST_RETVALUE) if not UPDATE_DIST: sys.exit(WORST_RETVALUE) diff --git a/common/check_messages_to_submit b/common/check_messages_to_submit index 9a3eee39..fd65fad0 100755 --- a/common/check_messages_to_submit +++ b/common/check_messages_to_submit @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # Copyright European Organization for Nuclear Research (CERN) 2013 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -8,20 +8,22 @@ # Authors: # - Mario Lassnig, , 2013-2014 # - Thomas Beermann, , 2019 +# - Eric Vaandering , 2022 """ Probe to check the queues of messages to submit by Hermes to the broker """ -from __future__ import print_function import sys -from prometheus_client import CollectorRegistry, Gauge, push_to_gateway - -from rucio.common.config import config_get +from prometheus_client import CollectorRegistry, Gauge from rucio.core import monitor from rucio.db.sqla.session import BASE, get_session +from utils import common + +PrometheusPusher = common.PrometheusPusher + # Exit statuses OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3 @@ -32,25 +34,16 @@ else: queue_sql = """SELECT COUNT(*) FROM {schema}messages""".format(schema=schema) -PROM_SERVERS = config_get('monitor', 'prometheus_servers', raise_exception=False, default='') -if PROM_SERVERS != '': - PROM_SERVERS = PROM_SERVERS.split(',') - if __name__ == "__main__": try: registry = CollectorRegistry() session = get_session() - result = session.execute(queue_sql).fetchall() - print('queues.messages %s' % result[0][0]) - monitor.record_gauge(stat='queues.messages', value=result[0][0]) - Gauge('hermes_queues_messages', '', registry=registry).set(result[0][0]) - - if len(PROM_SERVERS): - for server in PROM_SERVERS: - try: - push_to_gateway(server.strip(), job='check_messages_to_submit', registry=registry) - except: - continue + with PrometheusPusher(registry, job_name='check_messages_to_submit') as prometheus_config: + prefix: str = prometheus_config['prefix'] + result = session.execute(queue_sql).fetchall() + print('queues.messages %s' % result[0][0]) + monitor.record_gauge(stat='queues.messages', value=result[0][0]) + Gauge(prefix + 'hermes_queues_messages', '', registry=registry).set(result[0][0]) if result[0][0] > 100000: sys.exit(WARNING) diff --git a/common/check_new_dids b/common/check_new_dids index 2652aa0a..78e090fc 100755 --- a/common/check_new_dids +++ b/common/check_new_dids @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # Copyright European Organization for Nuclear Research (CERN) 2013 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -8,47 +8,40 @@ # Authors: # - Vincent Garonne, , 2013 # - Thomas Beermann, , 2019 -# - Eric Vaandering , 2020 +# - Eric Vaandering , 2020-2022 """ Probe to check the backlog of new dids. """ -from __future__ import print_function import sys import traceback -from prometheus_client import CollectorRegistry, Gauge, push_to_gateway -from rucio.common.config import config_get +from prometheus_client import CollectorRegistry, Gauge from rucio.core import monitor from rucio.db.sqla import models from rucio.db.sqla.session import get_session from rucio.db.sqla.util import get_count +from utils import common + +PrometheusPusher = common.PrometheusPusher + # Exit statuses OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3 -PROM_SERVERS = config_get('monitor', 'prometheus_servers', raise_exception=False, default='') -if PROM_SERVERS != '': - PROM_SERVERS = PROM_SERVERS.split(',') - if __name__ == "__main__": try: registry = CollectorRegistry() session = get_session() - query = (session.query(models.DataIdentifier.scope) - .with_hint(models.DataIdentifier, "INDEX_FFS(DIDS DIDS_IS_NEW_IDX)", 'oracle') - .filter(models.DataIdentifier.is_new.isnot(None))) - result = get_count(query) - monitor.record_gauge('transmogrifier.new_dids', value=result) - Gauge('transmogrifier_new_dids', '', registry=registry).set(result) - - if len(PROM_SERVERS): - for server in PROM_SERVERS: - try: - push_to_gateway(server.strip(), job='check_new_dids', registry=registry) - except: - continue + with PrometheusPusher(registry, job_name='check_new_dids') as prometheus_config: + prefix: str = prometheus_config['prefix'] + query = (session.query(models.DataIdentifier.scope) + .with_hint(models.DataIdentifier, "INDEX_FFS(DIDS DIDS_IS_NEW_IDX)", 'oracle') + .filter(models.DataIdentifier.is_new.isnot(None))) + result = get_count(query) + monitor.record_gauge('transmogrifier.new_dids', value=result) + Gauge(prefix + 'transmogrifier_new_dids', '', registry=registry).set(result) print(result) except: diff --git a/common/check_stuck_rules b/common/check_stuck_rules index 345cf0f3..82493f42 100755 --- a/common/check_stuck_rules +++ b/common/check_stuck_rules @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # Copyright European Organization for Nuclear Research (CERN) 2013 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -7,22 +7,24 @@ # # Authors: # - Martin Barisits, , 2014 -# - Eric Vaandering, , 2019-2021 +# - Eric Vaandering, , 2019-2022 # - Thomas Beermann, , 2019 """ Probe to check the backlog of stuck rules. """ -from __future__ import print_function import sys import traceback -from prometheus_client import CollectorRegistry, Gauge, push_to_gateway -from rucio.common.config import config_get +from prometheus_client import CollectorRegistry, Gauge from rucio.core import monitor from rucio.db.sqla.session import BASE, get_session +from utils import common + +PrometheusPusher = common.PrometheusPusher + # Exit statuses OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3 @@ -31,32 +33,23 @@ if BASE.metadata.schema: else: schema = '' -PROM_SERVERS = config_get('monitor', 'prometheus_servers', raise_exception=False, default='') -if PROM_SERVERS != '': - PROM_SERVERS = PROM_SERVERS.split(',') - if __name__ == "__main__": try: registry = CollectorRegistry() session = get_session() - sql = 'SELECT COUNT(1) FROM {schema}RULES where state=\'S\' and (error !=\'MissingSourceReplica\' or error IS NULL)'.format( - schema=schema) - result = session.execute(sql).fetchone()[0] - monitor.record_gauge('judge.stuck_rules_without_missing_source_replica', value=result) - Gauge('judge_stuck_rules_without_missing_source_replica', '', registry=registry).set(result) - - sql = 'SELECT COUNT(1) FROM {schema}RULES where state=\'S\' and error =\'MissingSourceReplica\''.format( - schema=schema) - result = session.execute(sql).fetchone()[0] - monitor.record_gauge('judge.stuck_rules_with_missing_source_replica', value=result) - Gauge('judge_stuck_rules_with_missing_source_replica', '', registry=registry).set(result) - - if len(PROM_SERVERS): - for server in PROM_SERVERS: - try: - push_to_gateway(server.strip(), job='check_stuck_rules', registry=registry) - except: - continue + with PrometheusPusher(registry, job_name='check_stuck_rules') as prometheus_config: + prefix: str = prometheus_config['prefix'] + + sql = 'SELECT COUNT(1) FROM {schema}RULES where state=\'S\' and (error !=\'MissingSourceReplica\' or error IS NULL)'.format(schema=schema) + result = session.execute(sql).fetchone()[0] + monitor.record_gauge('judge.stuck_rules_without_missing_source_replica', value=result) + Gauge(prefix + 'judge_stuck_rules_without_missing_source_replica', '', registry=registry).set(result) + + sql = 'SELECT COUNT(1) FROM {schema}RULES where state=\'S\' and error =\'MissingSourceReplica\''.format(schema=schema) + result = session.execute(sql).fetchone()[0] + monitor.record_gauge('judge.stuck_rules_with_missing_source_replica', value=result) + Gauge(prefix + 'judge_stuck_rules_with_missing_source_replica', '', registry=registry).set(result) + except: print(traceback.format_exc()) sys.exit(UNKNOWN) diff --git a/common/check_transfer_queues_status b/common/check_transfer_queues_status index 090f5340..81df7ccd 100755 --- a/common/check_transfer_queues_status +++ b/common/check_transfer_queues_status @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # Copyright European Organization for Nuclear Research (CERN) 2013 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -10,20 +10,22 @@ # - Cedric Serfon, , 2014 # - Wen Guan, , 2015 # - Thomas Beermann, , 2019 +# - Eric Vaandering, , 2022 """ Probe to check the queues of the transfer service """ -from __future__ import print_function import sys -from prometheus_client import CollectorRegistry, Gauge, push_to_gateway - -from rucio.common.config import config_get +from prometheus_client import CollectorRegistry, Gauge from rucio.core import monitor from rucio.db.sqla.session import BASE, get_session +from utils import common + +PrometheusPusher = common.PrometheusPusher + # Exit statuses OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3 @@ -57,29 +59,25 @@ FROM {schema}requests GROUP BY state, activity, external_host )""".format(schema=schema) -PROM_SERVERS = config_get('monitor', 'prometheus_servers', raise_exception=False, default='') -if PROM_SERVERS != '': - PROM_SERVERS = PROM_SERVERS.split(',') - if __name__ == "__main__": try: registry = CollectorRegistry() - g = Gauge('conveyor_queues_requests', '', labelnames=('state', 'activity', 'external_host'), registry=registry) - session = get_session() - for k in session.execute(active_queue).fetchall(): - print(k[0], k[1], end=" ") - monitor.record_gauge(stat=k[0], value=k[1]) - items = k[0].split('.') - state = items[2] - activity = items[3] - external_host = items[4] - g.labels(**{'activity': activity, 'state': state, 'external_host': external_host}).set(k[1]) - if len(PROM_SERVERS): - for server in PROM_SERVERS: - try: - push_to_gateway(server.strip(), job='check_transfer_queues_status', registry=registry) - except: - continue + with PrometheusPusher(registry, job_name='check_transfer_queues_status') as prometheus_config: + prefix: str = prometheus_config['prefix'] + extra_prom_labels = prometheus_config['labels'] + labelnames = ['state', 'activity', 'external_host'] + labelnames.extend(extra_prom_labels.keys()) + + g = Gauge(prefix+'conveyor_queues_requests', '', labelnames=labelnames, registry=registry) + session = get_session() + for k in session.execute(active_queue).fetchall(): + print(k[0], k[1], end=" ") + monitor.record_gauge(stat=k[0], value=k[1]) + items = k[0].split('.') + state = items[2] + activity = items[3] + external_host = items[4] + g.labels(**{'activity': activity, 'state': state, 'external_host': external_host}).set(k[1]) except: sys.exit(UNKNOWN) sys.exit(OK) diff --git a/common/check_unevaluated_dids b/common/check_unevaluated_dids index 6a0872ab..430954de 100755 --- a/common/check_unevaluated_dids +++ b/common/check_unevaluated_dids @@ -8,6 +8,7 @@ # Authors: # - Vincent Garonne, , 2013 # - Thomas Beermann, , 2019 +# - Eric Vaandering, , 2022 """ Probe to check the backlog of dids waiting for rule evaluation. @@ -16,12 +17,14 @@ from __future__ import print_function import sys -from prometheus_client import CollectorRegistry, Gauge, push_to_gateway - -from rucio.common.config import config_get +from prometheus_client import CollectorRegistry, Gauge from rucio.core import monitor from rucio.db.sqla.session import BASE, get_session +from utils import common + +PrometheusPusher = common.PrometheusPusher + # Exit statuses OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3 @@ -32,10 +35,6 @@ else: count_sql = 'SELECT COUNT(*) FROM {schema}updated_dids'.format(schema=schema) -PROM_SERVERS = config_get('monitor', 'prometheus_servers', raise_exception=False, default='') -if PROM_SERVERS != '': - PROM_SERVERS = PROM_SERVERS.split(',') - if __name__ == "__main__": try: session = get_session() @@ -43,14 +42,9 @@ if __name__ == "__main__": monitor.record_gauge(stat='judge.waiting_dids', value=result) registry = CollectorRegistry() - Gauge('judge_waiting_dids', '', registry=registry).set(result) - - if len(PROM_SERVERS): - for server in PROM_SERVERS: - try: - push_to_gateway(server.strip(), job='check_unevaluated_dids', registry=registry) - except: - continue + with PrometheusPusher(registry, job_name='check_unevaluated_dids') as prometheus_config: + prefix: str = prometheus_config['prefix'] + Gauge(prefix + 'judge_waiting_dids', '', registry=registry).set(result) print(result) except: diff --git a/common/check_unlocked_replicas b/common/check_unlocked_replicas index 081d7206..f837f69a 100755 --- a/common/check_unlocked_replicas +++ b/common/check_unlocked_replicas @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # Copyright European Organization for Nuclear Research (CERN) 2013 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -8,20 +8,22 @@ # Authors: # - Vincent Garonne, , 2013 # - Thomas Beermann, , 2019 +# - Eric Vaandering, , 2022 """ Probe to check the backlog of unlocked replicas. """ -from __future__ import print_function import sys -from prometheus_client import CollectorRegistry, Gauge, push_to_gateway - -from rucio.common.config import config_get +from prometheus_client import CollectorRegistry, Gauge from rucio.core import monitor from rucio.db.sqla.session import BASE, get_session +from utils import common + +PrometheusPusher = common.PrometheusPusher + # Exit statuses OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3 @@ -40,30 +42,23 @@ if BASE.metadata.schema: else: schema = '' -PROM_SERVERS = config_get('monitor', 'prometheus_servers', raise_exception=False, default='') -if PROM_SERVERS != '': - PROM_SERVERS = PROM_SERVERS.split(',') - if __name__ == "__main__": try: registry = CollectorRegistry() session = get_session() - unlocked_sql = 'select /*+ index_ffs(replicas REPLICAS_TOMBSTONE_IDX) */ count(1) from {schema}replicas where tombstone is not null'.format(schema=schema) - result = session.execute(unlocked_sql).fetchone()[0] - monitor.record_gauge(stat='reaper.unlocked_replicas', value=result) - Gauge('reaper_unlocked_replicas', '', registry=registry).set(result) - print(result) - expired_sql = 'select /*+ index_ffs(replicas REPLICAS_TOMBSTONE_IDX) */ count(1) from {schema}replicas where tombstone is not null and tombstone < sysdate - 2/24'.format(schema=schema) - result = session.execute(expired_sql).fetchone()[0] - monitor.record_gauge(stat='reaper.expired_replicas', value=result) - Gauge('reaper_expired_replicas', '', registry=registry).set(result) + with PrometheusPusher(registry, job_name='check_unevaluated_dids') as prometheus_config: + prefix: str = prometheus_config['prefix'] + + unlocked_sql = 'select /*+ index_ffs(replicas REPLICAS_TOMBSTONE_IDX) */ count(1) from {schema}replicas where tombstone is not null'.format(schema=schema) + result = session.execute(unlocked_sql).fetchone()[0] + monitor.record_gauge(stat='reaper.unlocked_replicas', value=result) + Gauge(prefix + 'reaper_unlocked_replicas', '', registry=registry).set(result) + print(result) + expired_sql = 'select /*+ index_ffs(replicas REPLICAS_TOMBSTONE_IDX) */ count(1) from {schema}replicas where tombstone is not null and tombstone < sysdate - 2/24'.format(schema=schema) + result = session.execute(expired_sql).fetchone()[0] + monitor.record_gauge(stat='reaper.expired_replicas', value=result) + Gauge(prefix + 'reaper_expired_replicas', '', registry=registry).set(result) - if len(PROM_SERVERS): - for server in PROM_SERVERS: - try: - push_to_gateway(server.strip(), job='check_unlocked_replicas', registry=registry) - except: - continue except: sys.exit(UNKNOWN) sys.exit(OK) diff --git a/common/check_updated_dids b/common/check_updated_dids index cca213fe..ae7270a2 100755 --- a/common/check_updated_dids +++ b/common/check_updated_dids @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # Copyright European Organization for Nuclear Research (CERN) 2013 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -8,45 +8,38 @@ # Authors: # - Vincent Garonne, , 2013 # - Thomas Beermann, , 2019 -# - Eric Vaandering , 2020-2021 +# - Eric Vaandering , 2020-2022 """ Probe to check the backlog of updated dids. """ -from __future__ import print_function import sys import traceback -from prometheus_client import CollectorRegistry, Gauge, push_to_gateway -from rucio.common.config import config_get +from prometheus_client import CollectorRegistry, Gauge from rucio.core import monitor from rucio.db.sqla import models from rucio.db.sqla.session import get_session from rucio.db.sqla.util import get_count +from utils import common + +PrometheusPusher = common.PrometheusPusher + # Exit statuses OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3 -PROM_SERVERS = config_get('monitor', 'prometheus_servers', raise_exception=False, default='') -if PROM_SERVERS != '': - PROM_SERVERS = PROM_SERVERS.split(',') - if __name__ == "__main__": try: registry = CollectorRegistry() session = get_session() - query = session.query(models.UpdatedDID) - result = get_count(query) - monitor.record_gauge('judge.updated_dids', value=result) - Gauge('judge_updated_dids', '', registry=registry).set(result) - - if len(PROM_SERVERS): - for server in PROM_SERVERS: - try: - push_to_gateway(server.strip(), job='check_updated_dids', registry=registry) - except: - continue + with PrometheusPusher(registry, job_name='check_updated_dids') as prometheus_config: + prefix: str = prometheus_config['prefix'] + query = session.query(models.UpdatedDID) + result = get_count(query) + monitor.record_gauge('judge.updated_dids', value=result) + Gauge(prefix + 'judge_updated_dids', '', registry=registry).set(result) # created_at, count, max, min, avg, stdev = 0.0, 0.0, 0.0, 0.0, 0.0, 0.0 # result = session.execute('select * from atlas_rucio.concurency_stats where created_at > sysdate - 1/1440') From 1741d6429fa39454e94671d6f36589ae018e5d90 Mon Sep 17 00:00:00 2001 From: Eric Vaandering Date: Thu, 24 Mar 2022 15:07:46 -0500 Subject: [PATCH 2/3] Missed a python3 --- common/check_unevaluated_dids | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/common/check_unevaluated_dids b/common/check_unevaluated_dids index 430954de..5119ce0c 100755 --- a/common/check_unevaluated_dids +++ b/common/check_unevaluated_dids @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # Copyright European Organization for Nuclear Research (CERN) 2013 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -13,7 +13,6 @@ """ Probe to check the backlog of dids waiting for rule evaluation. """ -from __future__ import print_function import sys From 2ab81e0c7d0beefa40a28c647a07fa82bca77af5 Mon Sep 17 00:00:00 2001 From: voetberg Date: Mon, 12 Feb 2024 15:23:33 -0600 Subject: [PATCH 3/3] added prompusher --- common/check_obsolete_replicas | 136 +++++++++++++++++++-------------- 1 file changed, 80 insertions(+), 56 deletions(-) diff --git a/common/check_obsolete_replicas b/common/check_obsolete_replicas index efd83943..5883bae8 100755 --- a/common/check_obsolete_replicas +++ b/common/check_obsolete_replicas @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # Copyright European Organization for Nuclear Research (CERN) 2013 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -8,14 +8,22 @@ # Authors: # - Vincent Garonne, , 2015 # - Cedric Serfon, , 2018 +# - Maggie Voetberg, , 2024 ''' Probe to check the backlog of obsolete replicas. ''' import sys +import traceback +from sqlalchemy.sql import text +from rucio.db.sqla.session import BASE, get_session +from utils.common import PrometheusPusher -from rucio.db.sqla.session import get_session +if BASE.metadata.schema: + schema = BASE.metadata.schema + '.' +else: + schema = '' # Exit statuses OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3 @@ -23,62 +31,78 @@ OK, WARNING, CRITICAL, UNKNOWN = 0, 1, 2, 3 if __name__ == "__main__": try: - SESSION = get_session() - QUERY = '''BEGIN - FOR u in (SELECT - a.rse_id AS rse_id, - NVL(b.files, 0) AS files, - NVL(b.bytes, 0) AS bytes, - SYS_EXTRACT_UTC(localtimestamp) AS updated_at - FROM - ( - SELECT - id AS rse_id - FROM - atlas_rucio.rses - WHERE - deleted=0) a - LEFT OUTER JOIN - ( - SELECT - /*+ INDEX_FFS(replicas REPLICAS_TOMBSTONE_IDX) */ - rse_id, - COUNT(1) AS files, - SUM(bytes) AS bytes - FROM - atlas_rucio.replicas - WHERE - ( - CASE - WHEN tombstone IS NOT NULL - THEN rse_id - END) IS NOT NULL - AND tombstone=to_date('1-1-1970 00:00:00','MM-DD-YYYY HH24:Mi:SS') - GROUP BY - rse_id) b - ON - a.rse_id=b.rse_id) + session = get_session() + with PrometheusPusher() as manager: + query = '''BEGIN + FOR u in (SELECT + a.rse_id AS rse_id, + NVL(b.files, 0) AS files, + NVL(b.bytes, 0) AS bytes, + SYS_EXTRACT_UTC(localtimestamp) AS updated_at + FROM + ( + SELECT + id AS rse_id + FROM + {schema}rses + WHERE + deleted=0) a + LEFT OUTER JOIN + ( + SELECT + /*+ INDEX_FFS(replicas REPLICAS_TOMBSTONE_IDX) */ + rse_id, + COUNT(1) AS files, + SUM(bytes) AS bytes + FROM + {schema}replicas + WHERE + ( + CASE + WHEN tombstone IS NOT NULL + THEN rse_id + END) IS NOT NULL + AND tombstone=to_date('1-1-1970 00:00:00','MM-DD-YYYY HH24:Mi:SS') + GROUP BY + rse_id) b + ON + a.rse_id=b.rse_id) - LOOP - MERGE INTO atlas_rucio.RSE_USAGE - USING DUAL - ON (atlas_rucio.RSE_USAGE.rse_id = u.rse_id and source = 'obsolete') - WHEN NOT MATCHED THEN INSERT(rse_id, source, used, files, updated_at, created_at) - VALUES (u.rse_id, 'obsolete', u.bytes, u.files, u.updated_at, u.updated_at) - WHEN MATCHED THEN UPDATE SET used=u.bytes, files=u.files, updated_at=u.updated_at; + LOOP + MERGE INTO {schema}RSE_USAGE + USING DUAL + ON ({schema}RSE_USAGE.rse_id = u.rse_id and source = 'obsolete') + WHEN NOT MATCHED THEN INSERT(rse_id, source, used, files, updated_at, created_at) + VALUES (u.rse_id, 'obsolete', u.bytes, u.files, u.updated_at, u.updated_at) + WHEN MATCHED THEN UPDATE SET used=u.bytes, files=u.files, updated_at=u.updated_at; - MERGE INTO ATLAS_RUCIO.RSE_USAGE_HISTORY H - USING DUAL - ON (h.rse_id = u.rse_id and h.source = 'obsolete' and h.updated_at = u.updated_at) - WHEN NOT MATCHED THEN INSERT(rse_id, source, used, files, updated_at, created_at) - VALUES (u.rse_id, 'obsolete', u.bytes, u.files, u.updated_at, u.updated_at); + MERGE INTO {schema}RSE_USAGE_HISTORY H + USING DUAL + ON (h.rse_id = u.rse_id and h.source = 'obsolete' and h.updated_at = u.updated_at) + WHEN NOT MATCHED THEN INSERT(rse_id, source, used, files, updated_at, created_at) + VALUES (u.rse_id, 'obsolete', u.bytes, u.files, u.updated_at, u.updated_at); - COMMIT; - END LOOP; -END; -''' - SESSION.execute(QUERY) - except Exception as error: - print error + COMMIT; + END LOOP; + END;'''.format(schema=schema) + + for result in session.execute(text(query)): + print(result) + + rse_id = result[0] + bytes_sum = result[2] + files_count = result[3] + + manager.gauge(name="obsolete_replicas_files.{rse}", + documentation="Probe to check the backlog of obsolete replicas.").labels(rse=rse_id).set(files_count) + + manager.gauge(name="obsolete_replicas_bytes.{rse}", + documentation="Probe to check the backlog of obsolete replicas.").labels().set(bytes_sum) + + + except: + print(traceback.format_exc()) sys.exit(UNKNOWN) + finally: + session.remove() sys.exit(OK)