Skip to content

Commit

Permalink
initial prep work to update slurm data collector to support multiple
Browse files Browse the repository at this point in the history
resource managers (e.g. slurm and flux).  Primary change is to rename
the resulting Prometheus metric and a number of variable names.

Signed-off-by: Karl W Schulz <[email protected]>
  • Loading branch information
koomie committed Jul 15, 2024
1 parent f233ebf commit 1b7a12a
Showing 1 changed file with 31 additions and 31 deletions.
62 changes: 31 additions & 31 deletions omnistat/collector_slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
# SOFTWARE.
# -------------------------------------------------------------------------------

"""SLURM data collector
"""Resource manager data collector
Implements a prometheus info metric to track SLURM job-related info. The
default resulting metric is named "slurmjob_info{}" and is always published. An
optional "slurmjob_annotations{}" metric can be published to provide
Implements a prometheus info metric to track job-related info. The
default resulting metric is named "rmsjob_info{}" and is always published. An
optional "rmsjob_annotations{}" metric can be published to provide
user-provided annotation timestamps.
"""

Expand All @@ -36,7 +36,7 @@
import os
import platform

from prometheus_client import Gauge, generate_latest, CollectorRegistry
from prometheus_client import Gauge, CollectorRegistry

import omnistat.utils as utils
from omnistat.collector_base import Collector
Expand All @@ -45,14 +45,14 @@
class SlurmJob(Collector):
def __init__(self, userMode=False, annotations=False, jobDetection=None):
logging.debug("Initializing SlurmJob data collector")
self.__prefix = "slurmjob_"
self.__prefix = "rmsjob_"
self.__userMode = userMode
self.__annotationsEnabled = annotations
self.__SLURMmetrics = {}
self.__slurmJobInfo = []
self.__RMSMetrics = {}
self.__rmsJobInfo = []
self.__lastAnnotationLabel = None
self.__slurmJobMode = jobDetection["mode"]
self.__slurmJobFile = jobDetection["file"]
self.__rmsJobMode = jobDetection["mode"]
self.__rmsJobFile = jobDetection["file"]

# setup squeue binary path to query slurm to determine node ownership
command = utils.resolvePath("squeue", "SLURM_PATH")
Expand All @@ -66,27 +66,27 @@ def __init__(self, userMode=False, annotations=False, jobDetection=None):
# cache current slurm job in user mode profiling - assumption is that it does not change
if self.__userMode is True:
# read from file if available
jobFile = self.__slurmJobFile
jobFile = self.__rmsJobFile
if os.path.isfile(jobFile):
with open(jobFile, "r") as f:
self.__slurmJobInfo = json.load(f)
logging.info("--> usermode jobinfo (from file): %s" % self.__slurmJobInfo)
self.__rmsJobInfo = json.load(f)
logging.info("--> usermode jobinfo (from file): %s" % self.__rmsJobInfo)

else:
# no job file data available: query slurm directly
# note: a longer timeout is provided since we only query once and some systems have slow
# slurm response times
logging.info("User mode collector enabled for SLURM, querying job info once at startup...")
self.__slurmJobInfo = self.querySlurmJob(timeout=15, exit_on_error=True, mode="squeue")
logging.info("--> usermode jobinfo (from slurm query): %s" % self.__slurmJobInfo)
self.__rmsJobInfo = self.querySlurmJob(timeout=15, exit_on_error=True, mode="squeue")
logging.info("--> usermode jobinfo (from slurm query): %s" % self.__rmsJobInfo)

else:
if self.__slurmJobMode == "file-based":
if self.__rmsJobMode == "file-based":
logging.info(
"collector_slurm: reading job information from prolog/epilog derived file (%s)"
% self.__slurmJobFile
% self.__rmsJobFile
)
elif self.__slurmJobMode == "squeue":
elif self.__rmsJobMode == "squeue":
logging.info("collector_slurm: will poll slurm periodicaly with squeue")
else:
logging.error("Unsupported slurm job data collection mode")
Expand Down Expand Up @@ -114,9 +114,9 @@ def querySlurmJob(self, timeout=1, exit_on_error=False, mode="squeue"):
]
results = dict(zip(keys, data))
elif mode == "file-based":
jobFileExists = os.path.isfile(self.__slurmJobFile)
jobFileExists = os.path.isfile(self.__rmsJobFile)
if jobFileExists:
with open(self.__slurmJobFile, "r") as file:
with open(self.__rmsJobFile, "r") as file:
results = json.load(file)
return results

Expand All @@ -126,34 +126,34 @@ def registerMetrics(self):
# alternate approach - define an info metric
# (https://ypereirareis.github.io/blog/2020/02/21/how-to-join-prometheus-metrics-by-label-with-promql/)
labels = ["jobid", "user", "partition", "nodes", "batchflag"]
self.__SLURMmetrics["info"] = Gauge(self.__prefix + "info", "SLURM job id", labels)
self.__RMSMetrics["info"] = Gauge(self.__prefix + "info", "SLURM job id", labels)

# metric to support user annotations
self.__SLURMmetrics["annotations"] = Gauge(
self.__RMSMetrics["annotations"] = Gauge(
self.__prefix + "annotations", "User job annotations", ["marker", "jobid"]
)

for metric in self.__SLURMmetrics:
for metric in self.__RMSMetrics:
logging.debug("--> Registered SLURM metric = %s" % metric)

def updateMetrics(self):
self.__SLURMmetrics["info"].clear()
self.__SLURMmetrics["annotations"].clear()
self.__RMSMetrics["info"].clear()
self.__RMSMetrics["annotations"].clear()
jobEnabled = False

results = None

if self.__userMode is True:
results = self.__slurmJobInfo
results = self.__rmsJobInfo
jobEnabled = True
else:
results = self.querySlurmJob(mode=self.__slurmJobMode)
results = self.querySlurmJob(mode=self.__rmsJobMode)
if results:
jobEnabled = True

# Case when SLURM job is allocated
if jobEnabled:
self.__SLURMmetrics["info"].labels(
self.__RMSMetrics["info"].labels(
jobid=results["SLURM_JOB_ID"],
user=results["SLURM_JOB_USER"],
partition=results["SLURM_JOB_PARTITION"],
Expand All @@ -176,21 +176,21 @@ def updateMetrics(self):
if self.__lastAnnotationLabel != None and (
not userFileExists or self.__lastAnnotationLabel != data["annotation"]
):
self.__SLURMmetrics["annotations"].labels(
self.__RMSMetrics["annotations"].labels(
marker=self.__lastAnnotationLabel,
jobid=results["SLURM_JOB_ID"],
).set(0)
self.__lastAnnotationLabel = None

if userFileExists:
self.__SLURMmetrics["annotations"].labels(
self.__RMSMetrics["annotations"].labels(
marker=data["annotation"],
jobid=results["SLURM_JOB_ID"],
).set(data["timestamp_secs"])
self.__lastAnnotationLabel = data["annotation"]

# Case when no job detected
else:
self.__SLURMmetrics["info"].labels(jobid="", user="", partition="", nodes="", batchflag="").set(1)
self.__RMSMetrics["info"].labels(jobid="", user="", partition="", nodes="", batchflag="").set(1)

return

0 comments on commit 1b7a12a

Please sign in to comment.