From 7e0c703562d8c72bac246e4498701fa11881b850 Mon Sep 17 00:00:00 2001 From: Christos Emmanouil Date: Wed, 26 Jun 2024 17:30:52 +0200 Subject: [PATCH] use prometheus to expose consistency statistics --- docker/Dockerfile | 3 +- docker/push2prometheus.py | 120 ++++++++++++++++++++++++++++++++++++++ docker/rucio.cfg.j2 | 10 ++++ docker/site.sh | 2 +- site_cmp3/site_cmp3.sh | 17 +++++- 5 files changed, 148 insertions(+), 4 deletions(-) create mode 100644 docker/push2prometheus.py diff --git a/docker/Dockerfile b/docker/Dockerfile index 5dd2b006..afda5677 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -53,6 +53,7 @@ RUN dnf install -y python3 python3-pip git \ RUN pip3 install --upgrade pip RUN pip3 install --upgrade setuptools RUN pip3 --no-cache-dir install SQLAlchemy pyyaml pythreader cx_Oracle j2cli +RUN pip3 install --no-cache-dir --pre rucio[oracle,mysql,postgresql] RUN pip3 install rucio-clients rucio-consistency @@ -60,7 +61,7 @@ RUN pip3 install rucio-clients rucio-consistency RUN mkdir -p /consistency RUN mkdir /root/RAL COPY vomses /etc -COPY cleanup.sh run.sh site.sh unmerged_site.sh RAL_Disk_pre.sh RAL_Disk_post.sh RAL_Tape_pre.sh RAL_Tape_post.sh rucio-client.cfg /consistency/ +COPY cleanup.sh run.sh site.sh unmerged_site.sh RAL_Disk_pre.sh RAL_Disk_post.sh RAL_Tape_pre.sh RAL_Tape_post.sh rucio-client.cfg push2prometheus.py /consistency/ ADD rucio.cfg.j2 /tmp diff --git a/docker/push2prometheus.py b/docker/push2prometheus.py new file mode 100644 index 00000000..5eeff3c7 --- /dev/null +++ b/docker/push2prometheus.py @@ -0,0 +1,120 @@ +import json, os, argparse +from typing import Tuple, Dict, List, Optional + +from rucio.common.config import config_get +from rucio.core.monitor import MetricManager + +PROBES_PREFIX = config_get('monitor', 'prometheus_prefix', raise_exception=False, default='') +probe_metrics = MetricManager(prefix=PROBES_PREFIX) + +def get_prometheus_config() -> Tuple[List, str, Dict]: + prom_servers = config_get('monitor', 'prometheus_servers', raise_exception=False, default='') + if prom_servers != '': + prom_servers = prom_servers.split(',') + else: + prom_servers = [] + prom_prefix = config_get('monitor', 'prometheus_prefix', raise_exception=False, default='') + prom_label_config = config_get('monitor', 'prometheus_labels', raise_exception=False, default=None) + if prom_label_config: + try: + prom_labels = json.loads(prom_label_config) + except ValueError: + prom_labels = None + else: + prom_labels = None + return prom_servers, prom_prefix, prom_labels + +class PrometheusPusher: + """ + A context manager to abstract the business of configuring and pushing to prometheus + """ + + def __init__(self, prefix: "Optional[str]" = PROBES_PREFIX, job_name: "Optional[str]" = None): + self.job_name = job_name + self.servers, _dummy, self.labels = get_prometheus_config() + self.prefix = prefix + + self.manager = MetricManager(prefix=self.prefix, push_gateways=self.servers) + # self.manager = MetricManager(prefix=self.prefix, push_gateways=[os.getenv('PROMETHEUS_PUSHGATEWAY_ENDPOINT')]) # Used for local development + + def __enter__(self) -> "MetricManager": + """ + Return the Rucio metrics manager + :return: + """ + return self.manager + + def __exit__(self, exc_type, exc_value, exc_traceback): + self.manager.push_metrics_to_gw(job=self.job_name, grouping_key=self.labels) + +scan_jobs_dict = { + 'site_cmp3': ['dbdump_before', 'scanner', 'dbdump_after', 'cmp3', 'diffs', 'missing_action', 'dark_action', 'empty_action'] +} + + +if __name__ == '__main__': + + parser = argparse.ArgumentParser(description="Push metrics from a JSON file to prometheus") + parser.add_argument("filename", type=argparse.FileType('r'), help="Path to the JSON file") + parser.add_argument("scan_type", choices=list(scan_jobs_dict.keys()), help=f"Scan types. Allowed values: {list(scan_jobs_dict.keys())}") + args = parser.parse_args() + + # Read the JSON file + try: + with args.filename as json_file: + data = json.load(json_file) + except FileNotFoundError: + print(f"Error: File not found: {args.filename}") + data = None + except json.JSONDecodeError: + print(f"Error: Invalid JSON format in file: {args.filename}") + data = None + + # Work on stats + if data: + RSE = data.get('rse', '') + + # Push metrics relative to the scan + with PrometheusPusher(job_name=args.scan_type) as manager: + if data: + for scan_stat_key in data: + if scan_stat_key not in scan_jobs_dict[args.scan_type]: + scan_stat_value = data[scan_stat_key] + if isinstance(scan_stat_value, (int, float)): + manager.gauge(name=".".join([scan_stat_key, '{rse}']), documentation=f'{scan_stat_key} of {args.scan_type}').labels(rse=RSE).set(scan_stat_value) + + # Push metrics relative to internal scan jobs + for job_key in scan_jobs_dict[args.scan_type]: + with PrometheusPusher(job_name=job_key) as manager: + + job_dict = None if not data else data.get(job_key, None) + health_value = 0 if not(job_dict and job_dict.get('status', None) == "done") else 1 + manager.gauge(name=".".join(['health', '{rse}']), documentation=f'Health status of {job_key} (1: Good, 0: Bad)').labels(rse=RSE).set(health_value) + + # Metrics are not removed from the pushgateway + # If a metric is not set when a new job is updated, its value in pushgateway will become NaN + # For these reasons no other checks are needed + + if job_dict: + for stat_key in job_dict: + + stat_value = job_dict[stat_key] + + if isinstance(stat_value, (int, float)): + manager.gauge(name=".".join([stat_key, '{rse}']), documentation=f'{stat_key} of {job_key}').labels(rse=RSE).set(stat_value) + + if job_key == 'scanner' and stat_key == 'roots': + + SERVER = job_dict.get('server', '') + + if type(stat_value) == list: + for root_dict in stat_value: + + ROOT = root_dict.get('root', '') + root_failed = 1 if root_dict.get('root_failed', None) else 0 + manager.gauge(name=".".join([stat_key, 'root_failed', '{rse}', '{server}', '{root}']), documentation=f'Root_failed of {job_key} for given RSE and root (1: False, 0: True)').labels(rse=RSE, server=SERVER, root=ROOT).set(root_failed) + + for root_stat_key in root_dict: + root_stat_value = root_dict[root_stat_key] + if isinstance(root_stat_value, (int, float)): + manager.gauge(name=".".join([stat_key, root_stat_key, '{rse}', '{server}', '{root}']), documentation=f'{root_stat_key} of {job_key} for given RSE and root').labels(rse=RSE, server=SERVER, root=ROOT).set(root_stat_value) diff --git a/docker/rucio.cfg.j2 b/docker/rucio.cfg.j2 index 417206c7..122dd28c 100755 --- a/docker/rucio.cfg.j2 +++ b/docker/rucio.cfg.j2 @@ -21,6 +21,9 @@ pool_recycle = {{ RUCIO_CFG_DATABASE_POOL_RECYCLE | default('600') }} carbon_server = {{ RUCIO_CFG_MONITOR_CARBON_SERVER | default('localhost') }} carbon_port = {{ RUCIO_CFG_MONITOR_CARBON_PORT | default('8125') }} user_scope = {{ RUCIO_CFG_MONITOR_USER_SCOPE | default('default_docker') }} +prometheus_servers = {{ PROMETHEUS_SERVERS | default('') }} +prometheus_prefix = {{ PROMETHEUS_PREFIX | default('') }} +prometheus_labels = {{ PROMETHEUS_LABELS | default('') }} [trace] tracedir = {{ RUCIO_CFG_TRACE_TRACEDIR | default('/var/log/rucio/trace') }} @@ -52,3 +55,10 @@ usercert = {{ RUCIO_CFG_WEBUI_USERCERT | default('/opt/rucio/etc/usercert_with_k [oidc] idpsecrets = {{ RUCIO_CFG_OIDC_IDPSECRETS | default('/opt/rucio/etc/idpsecrets.json') }} {% if RUCIO_CFG_OIDC_ADMIN_ISSUER is defined %}admin_issuer = {{ RUCIO_CFG_OIDC_ADMIN_ISSUER }}{% endif %} + +[client] +rucio_host = {{ RUCIO_HOST | default('') }} +auth_host = {{ AUTH_HOST | default('') }} +auth_type = {{ AUTH_TYPE | default('') }} +ca_cert = {{ CA_CERT | default('') }} +request_retries = {{ REQUEST_RETRIES | default('3') }} \ No newline at end of file diff --git a/docker/site.sh b/docker/site.sh index 39b88344..9bade5c7 100755 --- a/docker/site.sh +++ b/docker/site.sh @@ -5,7 +5,7 @@ cd /consistency/cms_consistency/site_cmp3 cp /opt/proxy/x509up /tmp/x509up chmod 600 /tmp/x509up export X509_USER_PROXY=/tmp/x509up -export RUCIO_CONFIG=/consistency/rucio-client.cfg +export RUCIO_CONFIG=/opt/rucio/etc/rucio.cfg export PYTHON=python3 cfg_src=/config/config.yaml diff --git a/site_cmp3/site_cmp3.sh b/site_cmp3/site_cmp3.sh index c87c4834..bb9a3763 100755 --- a/site_cmp3/site_cmp3.sh +++ b/site_cmp3/site_cmp3.sh @@ -108,6 +108,7 @@ cat > ${stats} <<_EOF_ "driver_version": "${version}", "python_version": "${python_version}", "end_time": ${timestamp}.0, + "elapsed_time": ${timestamp}.0, "disabled": $disabled } _EOF_ @@ -211,7 +212,7 @@ echo Dark files ... echo d_action_list=${out}/${RSE}_${now}_D_action.list dark_action_errors=${out}/${RSE}_${now}_dark_action.errors -$python actions/declare_dark.py -a root -o ${d_action_list} -c ${merged_config_file} -s $stats $out $RSE 2>> ${dark_action_errors} +$python actions/declare_dark.py -a root -o ${d_action_list} -c ${merged_config_file} -s $stats $out $RSE 2>> ${dark_action_errors} # # 6. Remove empty directories @@ -224,7 +225,19 @@ ed_action_errors=${out}/${RSE}_${now}_ED_action.errors $python actions/remove_empty_dirs.py -s $stats -c ${merged_config_file} -L 10000 $out $RSE 2> $ed_action_errors end_time=`date -u +%s` +elapsed_time=$((end_time - timestamp)) rce_update_stats $stats << _EOF_ -{ "end_time":${end_time}.0 } +{ + "end_time": ${end_time}.0, + "elapsed_time": ${elapsed_time}.0 +} _EOF_ + +# +# 7. Push2Prometheus +# +echo +echo Pushing stats... +echo +$python /consistency/push2prometheus.py $stats "site_cmp3"