From d7f7827b15ef113aed51ccd504fee42f6796f5ce Mon Sep 17 00:00:00 2001 From: Karl W Schulz Date: Mon, 8 Jul 2024 15:23:38 -0500 Subject: [PATCH] applying a code formatting pass to make the bots happy Signed-off-by: Karl W Schulz --- docs/conf.py | 2 +- omnistat-annotate | 12 +- omnistat-monitor | 12 +- omnistat-query | 12 +- omnistat-slurm-env | 12 +- omnistat-usermode | 12 +- omnistat/annotate.py | 29 ++- omnistat/collector_base.py | 19 +- omnistat/collector_slurm.py | 57 ++--- omnistat/collector_smi.py | 131 +++++----- omnistat/collector_smi_process.py | 10 +- omnistat/collector_smi_v2.py | 35 +-- omnistat/monitor.py | 100 ++++---- omnistat/node_monitoring.py | 29 ++- omnistat/omni_util.py | 13 +- omnistat/query.py | 404 ++++++++++++++++-------------- omnistat/slurm_env.py | 16 +- omnistat/utils.py | 44 ++-- test/test_integration.py | 8 +- 19 files changed, 510 insertions(+), 447 deletions(-) diff --git a/docs/conf.py b/docs/conf.py index 30653981..7e3bbd33 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -136,7 +136,7 @@ def install(package): # -- Options for HTMLHelp output --------------------------------------------- # Output file base name for HTML help builder. -#htmlhelp_basename = "Omniper" +# htmlhelp_basename = "Omniper" html_logo = "images/amd-header-logo.svg" html_theme_options = { diff --git a/omnistat-annotate b/omnistat-annotate index b3277194..8d763b7c 100755 --- a/omnistat-annotate +++ b/omnistat-annotate @@ -1,19 +1,19 @@ -#!/usr/bin/env python3 +#!/usr/bin/env python3 # ------------------------------------------------------------------------------- # ------------------------------------------------------------------------------- # MIT License -# +# # Copyright (c) 2023 - 2024 Advanced Micro Devices, Inc. All Rights Reserved. -# +# # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: -# +# # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. -# +# # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE @@ -36,5 +36,5 @@ except: print("Unable to load omnistat.annotate. Please verify installation.") sys.exit(1) -if __name__ == '__main__': +if __name__ == "__main__": sys.exit(main()) diff --git a/omnistat-monitor b/omnistat-monitor index bd78228a..57b2368e 100755 --- a/omnistat-monitor +++ b/omnistat-monitor @@ -1,19 +1,19 @@ -#!/usr/bin/env python3 +#!/usr/bin/env python3 # ------------------------------------------------------------------------------- # ------------------------------------------------------------------------------- # MIT License -# +# # Copyright (c) 2023 - 2024 Advanced Micro Devices, Inc. All Rights Reserved. -# +# # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: -# +# # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. -# +# # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE @@ -36,5 +36,5 @@ except: print("Unable to load omnistat.node_monitoring. Please verify installation.") sys.exit(1) -if __name__ == '__main__': +if __name__ == "__main__": sys.exit(main()) diff --git a/omnistat-query b/omnistat-query index 40e2554f..34eb8749 100755 --- a/omnistat-query +++ b/omnistat-query @@ -1,19 +1,19 @@ -#!/usr/bin/env python3 +#!/usr/bin/env python3 # ------------------------------------------------------------------------------- # ------------------------------------------------------------------------------- # MIT License -# +# # Copyright (c) 2023 - 2024 Advanced Micro Devices, Inc. All Rights Reserved. -# +# # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: -# +# # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. -# +# # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE @@ -36,5 +36,5 @@ except: print("Unable to load omnistat.query. Please verify installation.") sys.exit(1) -if __name__ == '__main__': +if __name__ == "__main__": sys.exit(main()) diff --git a/omnistat-slurm-env b/omnistat-slurm-env index 08e42edf..7b3dcb92 100755 --- a/omnistat-slurm-env +++ b/omnistat-slurm-env @@ -1,19 +1,19 @@ -#!/usr/bin/env python3 +#!/usr/bin/env python3 # ------------------------------------------------------------------------------- # ------------------------------------------------------------------------------- # MIT License -# +# # Copyright (c) 2023 - 2024 Advanced Micro Devices, Inc. All Rights Reserved. -# +# # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: -# +# # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. -# +# # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE @@ -36,5 +36,5 @@ except: print("Unable to load omnistat.slurm_env. Please verify installation.") sys.exit(1) -if __name__ == '__main__': +if __name__ == "__main__": sys.exit(main()) diff --git a/omnistat-usermode b/omnistat-usermode index 642d3f74..2bfae06d 100755 --- a/omnistat-usermode +++ b/omnistat-usermode @@ -1,19 +1,19 @@ -#!/usr/bin/env python3 +#!/usr/bin/env python3 # ------------------------------------------------------------------------------- # ------------------------------------------------------------------------------- # MIT License -# +# # Copyright (c) 2023 - 2024 Advanced Micro Devices, Inc. All Rights Reserved. -# +# # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: -# +# # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. -# +# # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE @@ -36,5 +36,5 @@ except: print("Unable to load omnistat.omni_util. Please verify installation.") sys.exit(1) -if __name__ == '__main__': +if __name__ == "__main__": sys.exit(main()) diff --git a/omnistat/annotate.py b/omnistat/annotate.py index 2aa0a567..6f7d136b 100755 --- a/omnistat/annotate.py +++ b/omnistat/annotate.py @@ -1,19 +1,19 @@ #!/usr/bin/env python3 # ------------------------------------------------------------------------------- # MIT License -# +# # Copyright (c) 2023 - 2024 Advanced Micro Devices, Inc. All Rights Reserved. -# +# # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: -# +# # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. -# +# # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE @@ -39,17 +39,18 @@ import json import os -class omnistat_annotate(): + +class omnistat_annotate: def __init__(self): - self.filename="/tmp/omnistat_" + os.environ.get('USER') + "_annotate.json" + self.filename = "/tmp/omnistat_" + os.environ.get("USER") + "_annotate.json" - def start(self,label): + def start(self, label): data = {} data["annotation"] = label data["timestamp_secs"] = int(time.time()) - with open(self.filename,"w") as outfile: - outfile.write(json.dumps(data,indent=4)) + with open(self.filename, "w") as outfile: + outfile.write(json.dumps(data, indent=4)) outfile.write("\n") return @@ -58,14 +59,15 @@ def stop(self): os.remove(self.filename) return + def main(): parser = argparse.ArgumentParser() - parser.add_argument("--mode", choices = ['start','stop'],help="annotation mode", required=True) + parser.add_argument("--mode", choices=["start", "stop"], help="annotation mode", required=True) parser.add_argument("--text", help="desired annotation", required=False) args = parser.parse_args() - if args.mode == 'start' and args.text is None: - parser.error("The --text option is required for \"start\" mode.") + if args.mode == "start" and args.text is None: + parser.error('The --text option is required for "start" mode.') annotate = omnistat_annotate() @@ -74,5 +76,6 @@ def main(): else: annotate.stop() -if __name__ == '__main__': + +if __name__ == "__main__": main() diff --git a/omnistat/collector_base.py b/omnistat/collector_base.py index 9472f2ee..246f910e 100644 --- a/omnistat/collector_base.py +++ b/omnistat/collector_base.py @@ -1,18 +1,18 @@ # ------------------------------------------------------------------------------- # MIT License -# +# # Copyright (c) 2023 - 2024 Advanced Micro Devices, Inc. All Rights Reserved. -# +# # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: -# +# # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. -# +# # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE @@ -24,22 +24,21 @@ # Prometheus data collector for HPC systems. # -# Base Collector class - defines required methods for all metric collectors +# Base Collector class - defines required methods for all metric collectors # implemented as a child class. -#-- +# -- from abc import ABC, abstractmethod + class Collector(ABC): # Required methods to be implemented by child classes @abstractmethod def registerMetrics(self): - """Defines desired metrics to monitor with Prometheus. Called once during initialization. - """ + """Defines desired metrics to monitor with Prometheus. Called once during initialization.""" pass @abstractmethod def updateMetrics(self): - """Updates defined metrics with latest values. Called at every polling interval. - """ + """Updates defined metrics with latest values. Called at every polling interval.""" pass diff --git a/omnistat/collector_slurm.py b/omnistat/collector_slurm.py index fbc67874..d20648d5 100644 --- a/omnistat/collector_slurm.py +++ b/omnistat/collector_slurm.py @@ -41,8 +41,9 @@ import omnistat.utils as utils from omnistat.collector_base import Collector + class SlurmJob(Collector): - def __init__(self,userMode=False,annotations=False,jobDetection=None): + def __init__(self, userMode=False, annotations=False, jobDetection=None): logging.debug("Initializing SlurmJob data collector") self.__prefix = "slurmjob_" self.__userMode = userMode @@ -50,18 +51,14 @@ def __init__(self,userMode=False,annotations=False,jobDetection=None): self.__SLURMmetrics = {} self.__slurmJobInfo = [] self.__lastAnnotationLabel = None - self.__slurmJobMode = jobDetection['mode'] - self.__slurmJobFile = jobDetection['file'] + self.__slurmJobMode = jobDetection["mode"] + self.__slurmJobFile = jobDetection["file"] # setup squeue binary path to query slurm to determine node ownership command = utils.resolvePath("squeue", "SLURM_PATH") # command-line flags for use with squeue to obtained desired metrics hostname = platform.node().split(".", 1)[0] - flags = ( - "-w " - + hostname - + " -h --Format=JobID::,UserName::,Partition::,NumNodes::,BatchFlag" - ) + flags = "-w " + hostname + " -h --Format=JobID::,UserName::,Partition::,NumNodes::,BatchFlag" # cache query command with options self.__squeue_query = [command] + flags.split() logging.debug("sqeueue_exec = %s" % self.__squeue_query) @@ -71,7 +68,7 @@ def __init__(self,userMode=False,annotations=False,jobDetection=None): # read from file if available jobFile = self.__slurmJobFile if os.path.isfile(jobFile): - with open(jobFile,'r') as f: + with open(jobFile, "r") as f: self.__slurmJobInfo = json.load(f) logging.info("--> usermode jobinfo (from file): %s" % self.__slurmJobInfo) @@ -80,18 +77,21 @@ def __init__(self,userMode=False,annotations=False,jobDetection=None): # note: a longer timeout is provided since we only query once and some systems have slow # slurm response times logging.info("User mode collector enabled for SLURM, querying job info once at startup...") - self.__slurmJobInfo = self.querySlurmJob(timeout=15,exit_on_error=True,mode='squeue') + self.__slurmJobInfo = self.querySlurmJob(timeout=15, exit_on_error=True, mode="squeue") logging.info("--> usermode jobinfo (from slurm query): %s" % self.__slurmJobInfo) else: - if self.__slurmJobMode == 'file-based': - logging.info("collector_slurm: reading job information from prolog/epilog derived file (%s)" % self.__slurmJobFile) - elif self.__slurmJobMode == 'squeue': + if self.__slurmJobMode == "file-based": + logging.info( + "collector_slurm: reading job information from prolog/epilog derived file (%s)" + % self.__slurmJobFile + ) + elif self.__slurmJobMode == "squeue": logging.info("collector_slurm: will poll slurm periodicaly with squeue") else: logging.error("Unsupported slurm job data collection mode") - def querySlurmJob(self,timeout=1,exit_on_error=False,mode='squeue'): + def querySlurmJob(self, timeout=1, exit_on_error=False, mode="squeue"): """ Query SLURM and return job info for local host. Supports two query modes: squeue call and read from file. @@ -100,19 +100,25 @@ def querySlurmJob(self,timeout=1,exit_on_error=False,mode='squeue'): """ results = {} - if mode == 'squeue': - data = utils.runShellCommand(self.__squeue_query,timeout=timeout,exit_on_error=exit_on_error) + if mode == "squeue": + data = utils.runShellCommand(self.__squeue_query, timeout=timeout, exit_on_error=exit_on_error) # squeue query output format: JOBID:USER:PARTITION:NUM_NODES:BATCHFLAG if data.stdout.strip(): data = data.stdout.strip().split(":") - keys = ["SLURM_JOB_ID","SLURM_JOB_USER","SLURM_JOB_PARTITION","SLURM_JOB_NUM_NODES","SLURM_JOB_BATCHMODE"] - results = dict(zip(keys,data)) - elif mode == 'file-based': + keys = [ + "SLURM_JOB_ID", + "SLURM_JOB_USER", + "SLURM_JOB_PARTITION", + "SLURM_JOB_NUM_NODES", + "SLURM_JOB_BATCHMODE", + ] + results = dict(zip(keys, data)) + elif mode == "file-based": jobFileExists = os.path.isfile(self.__slurmJobFile) if jobFileExists: with open(self.__slurmJobFile, "r") as file: results = json.load(file) - return(results) + return results def registerMetrics(self): """Register metrics of interest""" @@ -120,9 +126,7 @@ def registerMetrics(self): # alternate approach - define an info metric # (https://ypereirareis.github.io/blog/2020/02/21/how-to-join-prometheus-metrics-by-label-with-promql/) labels = ["jobid", "user", "partition", "nodes", "batchflag"] - self.__SLURMmetrics["info"] = Gauge( - self.__prefix + "info", "SLURM job id", labels - ) + self.__SLURMmetrics["info"] = Gauge(self.__prefix + "info", "SLURM job id", labels) # metric to support user annotations self.__SLURMmetrics["annotations"] = Gauge( @@ -170,8 +174,7 @@ def updateMetrics(self): # 1. Previous annotation stopped (file no longer present) # 2. There is a new annotation (label has changed) if self.__lastAnnotationLabel != None and ( - not userFileExists - or self.__lastAnnotationLabel != data["annotation"] + not userFileExists or self.__lastAnnotationLabel != data["annotation"] ): self.__SLURMmetrics["annotations"].labels( marker=self.__lastAnnotationLabel, @@ -188,8 +191,6 @@ def updateMetrics(self): # Case when no job detected else: - self.__SLURMmetrics["info"].labels( - jobid="", user="", partition="", nodes="", batchflag="" - ).set(1) + self.__SLURMmetrics["info"].labels(jobid="", user="", partition="", nodes="", batchflag="").set(1) return diff --git a/omnistat/collector_smi.py b/omnistat/collector_smi.py index 70892b34..80811d36 100644 --- a/omnistat/collector_smi.py +++ b/omnistat/collector_smi.py @@ -1,18 +1,18 @@ # ------------------------------------------------------------------------------- # MIT License -# +# # Copyright (c) 2023 - 2024 Advanced Micro Devices, Inc. All Rights Reserved. -# +# # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: -# +# # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. -# +# # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE @@ -48,8 +48,8 @@ from omnistat.collector_base import Collector from omnistat.utils import gpu_index_mapping -rsmi_clk_names_dict = {'sclk': 0x0, 'fclk': 0x1, 'dcefclk': 0x2,\ - 'socclk': 0x3, 'mclk': 0x4} +rsmi_clk_names_dict = {"sclk": 0x0, "fclk": 0x1, "dcefclk": 0x2, "socclk": 0x3, "mclk": 0x4} + def get_rsmi_frequencies_type(rsmiVersion): """ @@ -68,35 +68,48 @@ def get_rsmi_frequencies_type(rsmiVersion): RSMI_MAX_NUM_FREQUENCIES = 32 class rsmi_frequencies_t(ctypes.Structure): - _fields_ = [('num_supported', ctypes.c_int32), - ('current', ctypes.c_uint32), - ('frequency', ctypes.c_uint64 * RSMI_MAX_NUM_FREQUENCIES)] + _fields_ = [ + ("num_supported", ctypes.c_int32), + ("current", ctypes.c_uint32), + ("frequency", ctypes.c_uint64 * RSMI_MAX_NUM_FREQUENCIES), + ] + return rsmi_frequencies_t() else: logging.info("SMI version >= 6") RSMI_MAX_NUM_FREQUENCIES = 33 + class rsmi_frequencies_t(ctypes.Structure): - _fields_ = [('has_deep_sleep', ctypes.c_bool), - ('num_supported', ctypes.c_int32), - ('current', ctypes.c_uint32), - ('frequency', ctypes.c_uint64 * RSMI_MAX_NUM_FREQUENCIES)] + _fields_ = [ + ("has_deep_sleep", ctypes.c_bool), + ("num_supported", ctypes.c_int32), + ("current", ctypes.c_uint32), + ("frequency", ctypes.c_uint64 * RSMI_MAX_NUM_FREQUENCIES), + ] + return rsmi_frequencies_t() + class rsmi_power_type_t(ctypes.c_int): - RSMI_AVERAGE_POWER = 0, - RSMI_CURRENT_POWER = 1, - RSMI_INVALID_POWER = 0xFFFFFFFF + RSMI_AVERAGE_POWER = (0,) + RSMI_CURRENT_POWER = (1,) + RSMI_INVALID_POWER = 0xFFFFFFFF + class rsmi_version_t(ctypes.Structure): - _fields_ = [('major', ctypes.c_uint32), - ('minor', ctypes.c_uint32), - ('patch', ctypes.c_uint32), - ('build', ctypes.c_char_p)] + _fields_ = [ + ("major", ctypes.c_uint32), + ("minor", ctypes.c_uint32), + ("patch", ctypes.c_uint32), + ("build", ctypes.c_char_p), + ] + + +# -- -#-- class ROCMSMI(Collector): - def __init__(self,rocm_path="/opt/rocm"): + def __init__(self, rocm_path="/opt/rocm"): logging.debug("Initializing ROCm SMI data collector") self.__prefix = "rocm_" @@ -108,13 +121,13 @@ def __init__(self,rocm_path="/opt/rocm"): # initialize smi library ret_init = self.__libsmi.rsmi_init(0) - assert(ret_init == 0) + assert ret_init == 0 logging.info("SMI library API initialized") # cache smi library version verInfo = rsmi_version_t() ret = self.__libsmi.rsmi_version_get(ctypes.byref(verInfo)) - self.__smiVersion = {"major":verInfo.major,"minor":verInfo.minor,"patch":verInfo.patch} + self.__smiVersion = {"major": verInfo.major, "minor": verInfo.minor, "patch": verInfo.patch} self.__rsmi_frequencies_type = get_rsmi_frequencies_type(self.__smiVersion) @@ -122,7 +135,7 @@ def __init__(self,rocm_path="/opt/rocm"): logging.error("") logging.error("ERROR: Unable to load SMI library.") logging.error("--> looking for %s" % smi_lib) - logging.error("--> please verify path and set \"rocm_path\" in runtime config file if necesssary.") + logging.error('--> please verify path and set "rocm_path" in runtime config file if necesssary.') logging.error("") sys.exit(4) @@ -141,24 +154,21 @@ def registerMetrics(self): numDevices = ctypes.c_uint32(0) ret = self.__libsmi.rsmi_num_monitor_devices(ctypes.byref(numDevices)) - assert(ret == 0) + assert ret == 0 logging.info("Number of GPU devices = %i" % numDevices.value) # register number of GPUs - numGPUs_metric = Gauge( - self.__prefix + "num_gpus", "# of GPUS available on host" - ) + numGPUs_metric = Gauge(self.__prefix + "num_gpus", "# of GPUS available on host") numGPUs_metric.set(numDevices.value) self.__num_gpus = numDevices.value - # determine GPU index mapping (ie. map kfd indices used by SMI lib to that of HIP_VISIBLE_DEVICES) bdfid = ctypes.c_int64(0) bdfMapping = {} for i in range(self.__num_gpus): device = ctypes.c_uint32(i) - ret = self.__libsmi.rsmi_dev_pci_id_get(device,ctypes.byref(bdfid)) - assert(ret == 0) + ret = self.__libsmi.rsmi_dev_pci_id_get(device, ctypes.byref(bdfid)) + assert ret == 0 bdfMapping[i] = bdfid.value self.__indexMapping = gpu_index_mapping(bdfMapping, self.__num_gpus) @@ -168,7 +178,9 @@ def registerMetrics(self): # temperature self.registerGPUMetric(self.__prefix + "temperature_edge_celsius", "gauge", "Temperature (Sensor edge) (C)") # power - self.registerGPUMetric(self.__prefix + "average_socket_power_watts", "gauge", "Average Graphics Package Power (W)") + self.registerGPUMetric( + self.__prefix + "average_socket_power_watts", "gauge", "Average Graphics Package Power (W)" + ) # clock speeds self.registerGPUMetric(self.__prefix + "sclk_clock_mhz", "gauge", "current sclk clock speed (Mhz)") self.registerGPUMetric(self.__prefix + "mclk_clock_mhz", "gauge", "current mclk clock speed (Mhz)") @@ -176,8 +188,8 @@ def registerMetrics(self): self.registerGPUMetric(self.__prefix + "vram_total_bytes", "gauge", "VRAM Total Memory (B)") self.registerGPUMetric(self.__prefix + "vram_used_percentage", "gauge", "VRAM Memory in Use (%)") # utilization - self.registerGPUMetric(self.__prefix + "utilization_percentage","gauge","GPU use (%)") - + self.registerGPUMetric(self.__prefix + "utilization_percentage", "gauge", "GPU use (%)") + return def updateMetrics(self): @@ -189,16 +201,12 @@ def updateMetrics(self): def registerGPUMetric(self, metricName, type, description): if metricName in self.__GPUmetrics: - logging.error( - "Ignoring duplicate metric name addition: %s" % (name) - ) + logging.error("Ignoring duplicate metric name addition: %s" % (name)) return if type == "gauge": - self.__GPUmetrics[metricName] = Gauge(metricName, description,labelnames=["card"]) + self.__GPUmetrics[metricName] = Gauge(metricName, description, labelnames=["card"]) - logging.info( - "--> [registered] %s -> %s (gauge)" % (metricName, description) - ) + logging.info("--> [registered] %s -> %s (gauge)" % (metricName, description)) else: logging.error("Ignoring unknown metric type -> %s" % type) return @@ -209,69 +217,66 @@ def collect_data_incremental(self): # --- temperature = ctypes.c_int64(0) - temp_metric = ctypes.c_int32(0) # 0=RSMI_TEMP_CURRENT + temp_metric = ctypes.c_int32(0) # 0=RSMI_TEMP_CURRENT temp_location = ctypes.c_int32(0) # 0=RSMI_TEMP_TYPE_EDGE power = ctypes.c_uint64(0) power_type = rsmi_power_type_t() # freq = rsmi_frequencies_t() freq = self.__rsmi_frequencies_type - freq_system_clock = 0 # 0=RSMI_CLK_TYPE_SYS - freq_mem_clock = 4 # 4=RSMI_CLK_TYPE_MEM + freq_system_clock = 0 # 0=RSMI_CLK_TYPE_SYS + freq_mem_clock = 4 # 4=RSMI_CLK_TYPE_MEM vram_total = ctypes.c_uint64(0) - vram_used = ctypes.c_uint64(0) + vram_used = ctypes.c_uint64(0) utilization = ctypes.c_uint32(0) for i in range(self.__num_gpus): - + device = ctypes.c_uint32(i) gpuLabel = self.__indexMapping[i] - #-- + # -- # temperature [millidegrees Celcius, converted to degrees Celcius] metric = self.__prefix + "temperature_edge_celsius" - ret = self.__libsmi.rsmi_dev_temp_metric_get(device, - temp_location, - temp_metric, - ctypes.byref(temperature)) + ret = self.__libsmi.rsmi_dev_temp_metric_get(device, temp_location, temp_metric, ctypes.byref(temperature)) self.__GPUmetrics[metric].labels(card=gpuLabel).set(temperature.value / 1000.0) - #-- + # -- # average socket power [micro Watts, converted to Watts] metric = self.__prefix + "average_socket_power_watts" if self.__smiVersion["major"] < 6: ret = self.__libsmi.rsmi_dev_power_ave_get(device, 0, ctypes.byref(power)) else: - ret = self.__libsmi.rsmi_dev_power_get(device,ctypes.byref(power),ctypes.byref(power_type)) + ret = self.__libsmi.rsmi_dev_power_get(device, ctypes.byref(power), ctypes.byref(power_type)) if ret == 0: self.__GPUmetrics[metric].labels(card=gpuLabel).set(power.value / 1000000.0) else: self.__GPUmetrics[metric].labels(card=gpuLabel).set(0.0) - #-- + # -- # clock speeds [Hz, converted to megaHz] metric = self.__prefix + "sclk_clock_mhz" - ret = self.__libsmi.rsmi_dev_gpu_clk_freq_get(device,freq_system_clock, ctypes.byref(freq)) + ret = self.__libsmi.rsmi_dev_gpu_clk_freq_get(device, freq_system_clock, ctypes.byref(freq)) self.__GPUmetrics[metric].labels(card=gpuLabel).set(freq.frequency[freq.current] / 1000000.0) - + metric = self.__prefix + "mclk_clock_mhz" - ret = self.__libsmi.rsmi_dev_gpu_clk_freq_get(device,freq_mem_clock, ctypes.byref(freq)) + ret = self.__libsmi.rsmi_dev_gpu_clk_freq_get(device, freq_mem_clock, ctypes.byref(freq)) self.__GPUmetrics[metric].labels(card=gpuLabel).set(freq.frequency[freq.current] / 1000000.0) - #-- + # -- # gpu memory [total_vram in bytes] metric = self.__prefix + "vram_total_bytes" - ret = self.__libsmi.rsmi_dev_memory_total_get(device,0x0,ctypes.byref(vram_total)) + ret = self.__libsmi.rsmi_dev_memory_total_get(device, 0x0, ctypes.byref(vram_total)) self.__GPUmetrics[metric].labels(card=gpuLabel).set(vram_total.value) metric = self.__prefix + "vram_used_percentage" - ret = self.__libsmi.rsmi_dev_memory_usage_get(device,0x0,ctypes.byref(vram_used)) + ret = self.__libsmi.rsmi_dev_memory_usage_get(device, 0x0, ctypes.byref(vram_used)) percentage = round(100.0 * vram_used.value / vram_total.value, 4) self.__GPUmetrics[metric].labels(card=gpuLabel).set(percentage) - #-- + # -- # utilization metric = self.__prefix + "utilization_percentage" - ret = self.__libsmi.rsmi_dev_busy_percent_get(device,ctypes.byref(utilization)) + ret = self.__libsmi.rsmi_dev_busy_percent_get(device, ctypes.byref(utilization)) self.__GPUmetrics[metric].labels(card=gpuLabel).set(utilization.value) return diff --git a/omnistat/collector_smi_process.py b/omnistat/collector_smi_process.py index 0d95ca34..7fc117e5 100644 --- a/omnistat/collector_smi_process.py +++ b/omnistat/collector_smi_process.py @@ -1,18 +1,18 @@ # ------------------------------------------------------------------------------- # MIT License -# +# # Copyright (c) 2023 - 2024 Advanced Micro Devices, Inc. All Rights Reserved. -# +# # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: -# +# # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. -# +# # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE @@ -54,7 +54,7 @@ def get_gpu_processes(device): # Catch all for unsupported rocm version for process info return result # Ignore the Python process itself for the reading - if p['name'] == 'python3' and (p['mem'] == 4096 or p["memory_usage"]["vram_mem"] == 12288): + if p["name"] == "python3" and (p["mem"] == 4096 or p["memory_usage"]["vram_mem"] == 12288): continue result.append(p) return result diff --git a/omnistat/collector_smi_v2.py b/omnistat/collector_smi_v2.py index cf40eed1..ad9b1c4e 100644 --- a/omnistat/collector_smi_v2.py +++ b/omnistat/collector_smi_v2.py @@ -1,18 +1,18 @@ # ------------------------------------------------------------------------------- # MIT License -# +# # Copyright (c) 2023 - 2024 Advanced Micro Devices, Inc. All Rights Reserved. -# +# # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: -# +# # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. -# +# # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE @@ -47,6 +47,7 @@ from prometheus_client import Gauge from omnistat.utils import convert_bdf_to_gpuid, gpu_index_mapping + def get_gpu_metrics(device): result = smi.amdsmi_get_gpu_metrics_info(device) for k, v in result.items(): @@ -72,20 +73,22 @@ def get_gpu_metrics(device): result[k] = 0 return result + def check_min_version(minVersion): localVer = smi.amdsmi_get_lib_version() - localVerString = '.'.join([str(localVer["year"]),str(localVer["major"]),str(localVer["minor"])]) + localVerString = ".".join([str(localVer["year"]), str(localVer["major"]), str(localVer["minor"])]) vmin = packaging.version.Version(minVersion) vloc = packaging.version.Version(localVerString) if vloc < vmin: logging.error("") logging.error("ERROR: Minimum amdsmi version not met.") - logging.error("--> Detected version = %s (>= %s required)" % (vloc,vmin)) + logging.error("--> Detected version = %s (>= %s required)" % (vloc, vmin)) logging.error("") sys.exit(4) else: logging.info("--> library version = %s" % vloc) + class AMDSMI(Collector): def __init__(self): logging.debug("Initializing AMD SMI data collector") @@ -112,7 +115,8 @@ def registerMetrics(self): # number of GPUs numGPUs_metric = Gauge( - self.__prefix + "num_gpus", "# of GPUS available on host", + self.__prefix + "num_gpus", + "# of GPUS available on host", ) numGPUs_metric.set(self.__num_gpus) @@ -127,17 +131,21 @@ def registerMetrics(self): # Define mapping from amdsmi variable names to omnistat metric, incuding units where appropriate self.__metricMapping = { # core GPU metric definitions - "average_gfx_activity" : "utilization_percentage", + "average_gfx_activity": "utilization_percentage", "vram_total": "vram_total_bytes", - "average_socket_power" : "average_socket_power_watts", + "average_socket_power": "average_socket_power_watts", "temperature_edge": "temperature_edge_celsius", "current_gfxclks": "sclk_clock_mhz", - "average_uclk_frequency": "mclk_clock_mhz" + "average_uclk_frequency": "mclk_clock_mhz", } # Register memory related metrics - self.__GPUMetrics["vram_total_bytes"] = Gauge(self.__prefix + "vram_total_bytes","VRAM Memory in Use (%)",labelnames=["card"]) - self.__GPUMetrics["vram_used_percentage"] = Gauge(self.__prefix + "vram_used_percentage","VRAM Memory in Use (%)",labelnames=["card"]) + self.__GPUMetrics["vram_total_bytes"] = Gauge( + self.__prefix + "vram_total_bytes", "VRAM Memory in Use (%)", labelnames=["card"] + ) + self.__GPUMetrics["vram_used_percentage"] = Gauge( + self.__prefix + "vram_used_percentage", "VRAM Memory in Use (%)", labelnames=["card"] + ) # Register remaining metrics of interest available from get_gpu_metrics() for idx, device in enumerate(self.__devices): @@ -153,7 +161,7 @@ def registerMetrics(self): # add Gauge metric only once if metric_name not in self.__GPUMetrics.keys(): - self.__GPUMetrics[metric_name] = Gauge(metric_name,f"{metric}",labelnames=["card"]) + self.__GPUMetrics[metric_name] = Gauge(metric_name, f"{metric}", labelnames=["card"]) return @@ -185,4 +193,3 @@ def collect_data_incremental(self): # Set metric metric.labels(card=cardId).set(value) return - diff --git a/omnistat/monitor.py b/omnistat/monitor.py index 8a3d87b7..7a76ab4e 100644 --- a/omnistat/monitor.py +++ b/omnistat/monitor.py @@ -1,18 +1,18 @@ # ------------------------------------------------------------------------------- # MIT License -# +# # Copyright (c) 2023 - 2024 Advanced Micro Devices, Inc. All Rights Reserved. -# +# # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: -# +# # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. -# +# # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE @@ -24,9 +24,9 @@ # Prometheus data collector for HPC systems. # -# Supporting monitor class to implement a prometheus data collector with one +# Supporting monitor class to implement a prometheus data collector with one # or more custom collector(s). -#-- +# -- import configparser import importlib.resources @@ -41,36 +41,44 @@ from omnistat import utils -class Monitor(): - def __init__(self,config): - logging.basicConfig( - format="%(message)s", level=logging.INFO, stream=sys.stdout - ) + +class Monitor: + def __init__(self, config): + logging.basicConfig(format="%(message)s", level=logging.INFO, stream=sys.stdout) self.runtimeConfig = {} - self.runtimeConfig['collector_enable_rocm_smi'] = config['omnistat.collectors'].getboolean('enable_rocm_smi',True) - self.runtimeConfig['collector_enable_slurm'] = config['omnistat.collectors'].getboolean('enable_slurm',False) - self.runtimeConfig['collector_enable_amd_smi'] = config['omnistat.collectors'].getboolean('enable_amd_smi', False) - self.runtimeConfig['collector_enable_amd_smi_process'] = config['omnistat.collectors'].getboolean('enable_amd_smi_process', - False) - self.runtimeConfig['collector_port'] = config['omnistat.collectors'].get('port',8000) - self.runtimeConfig['collector_usermode'] = config['omnistat.collectors'].getboolean('usermode',False) - self.runtimeConfig['collector_rocm_path'] = config['omnistat.collectors'].get('rocm_path','/opt/rocm') + self.runtimeConfig["collector_enable_rocm_smi"] = config["omnistat.collectors"].getboolean( + "enable_rocm_smi", True + ) + self.runtimeConfig["collector_enable_slurm"] = config["omnistat.collectors"].getboolean("enable_slurm", False) + self.runtimeConfig["collector_enable_amd_smi"] = config["omnistat.collectors"].getboolean( + "enable_amd_smi", False + ) + self.runtimeConfig["collector_enable_amd_smi_process"] = config["omnistat.collectors"].getboolean( + "enable_amd_smi_process", False + ) + self.runtimeConfig["collector_port"] = config["omnistat.collectors"].get("port", 8000) + self.runtimeConfig["collector_usermode"] = config["omnistat.collectors"].getboolean("usermode", False) + self.runtimeConfig["collector_rocm_path"] = config["omnistat.collectors"].get("rocm_path", "/opt/rocm") - allowed_ips = config['omnistat.collectors'].get('allowed_ips','127.0.0.1') + allowed_ips = config["omnistat.collectors"].get("allowed_ips", "127.0.0.1") # convert comma-separated string into list - self.runtimeConfig['collector_allowed_ips'] = re.split(r',\s*',allowed_ips) - logging.info("Allowed query IPs = %s" % self.runtimeConfig['collector_allowed_ips']) + self.runtimeConfig["collector_allowed_ips"] = re.split(r",\s*", allowed_ips) + logging.info("Allowed query IPs = %s" % self.runtimeConfig["collector_allowed_ips"]) # additional slurm collector controls - if self.runtimeConfig['collector_enable_slurm'] == True: + if self.runtimeConfig["collector_enable_slurm"] == True: self.jobDetection = {} - self.runtimeConfig['slurm_collector_annotations'] = config['omnistat.collectors.slurm'].getboolean('enable_annotations',False) - self.jobDetection['mode'] = config['omnistat.collectors.slurm'].get('job_detection_mode','file-based') - self.jobDetection['file']= config['omnistat.collectors.slurm'].get('job_detection_file','/tmp/omni_slurmjobinfo') - if config.has_option('omnistat.collectors.slurm','host_skip'): - self.runtimeConfig['slurm_collector_host_skip'] = config['omnistat.collectors.slurm']['host_skip'] + self.runtimeConfig["slurm_collector_annotations"] = config["omnistat.collectors.slurm"].getboolean( + "enable_annotations", False + ) + self.jobDetection["mode"] = config["omnistat.collectors.slurm"].get("job_detection_mode", "file-based") + self.jobDetection["file"] = config["omnistat.collectors.slurm"].get( + "job_detection_file", "/tmp/omni_slurmjobinfo" + ) + if config.has_option("omnistat.collectors.slurm", "host_skip"): + self.runtimeConfig["slurm_collector_host_skip"] = config["omnistat.collectors.slurm"]["host_skip"] # defined global prometheus metrics self.__globalMetrics = {} @@ -80,13 +88,13 @@ def __init__(self,config): self.__collectors = [] # allow for disablement of slurm collector via regex match - if self.runtimeConfig['collector_enable_slurm']: - if config.has_option('omnistat.collectors.slurm','host_skip'): - host_skip = utils.removeQuotes(config['omnistat.collectors.slurm']['host_skip']) - hostname = platform.node().split('.', 1)[0] + if self.runtimeConfig["collector_enable_slurm"]: + if config.has_option("omnistat.collectors.slurm", "host_skip"): + host_skip = utils.removeQuotes(config["omnistat.collectors.slurm"]["host_skip"]) + hostname = platform.node().split(".", 1)[0] p = re.compile(host_skip) if p.match(hostname): - self.runtimeConfig['collector_enable_slurm'] = False + self.runtimeConfig["collector_enable_slurm"] = False logging.info("Disabling SLURM collector via host_skip match (%s)" % host_skip) logging.debug("Completed collector initialization (base class)") @@ -94,21 +102,29 @@ def __init__(self,config): def initMetrics(self): - if self.runtimeConfig['collector_enable_rocm_smi']: + if self.runtimeConfig["collector_enable_rocm_smi"]: from omnistat.collector_smi import ROCMSMI - self.__collectors.append(ROCMSMI(rocm_path=self.runtimeConfig['collector_rocm_path'])) - if self.runtimeConfig['collector_enable_amd_smi']: + + self.__collectors.append(ROCMSMI(rocm_path=self.runtimeConfig["collector_rocm_path"])) + if self.runtimeConfig["collector_enable_amd_smi"]: from omnistat.collector_smi_v2 import AMDSMI + self.__collectors.append(AMDSMI()) - if self.runtimeConfig['collector_enable_amd_smi_process']: + if self.runtimeConfig["collector_enable_amd_smi_process"]: from omnistat.collector_smi_process import AMDSMIProcess + self.__collectors.append(AMDSMIProcess()) - if self.runtimeConfig['collector_enable_slurm']: + if self.runtimeConfig["collector_enable_slurm"]: from omnistat.collector_slurm import SlurmJob - self.__collectors.append(SlurmJob(userMode=self.runtimeConfig['collector_usermode'], - annotations=self.runtimeConfig['slurm_collector_annotations'], - jobDetection=self.jobDetection)) - + + self.__collectors.append( + SlurmJob( + userMode=self.runtimeConfig["collector_usermode"], + annotations=self.runtimeConfig["slurm_collector_annotations"], + jobDetection=self.jobDetection, + ) + ) + # Initialize all metrics for collector in self.__collectors: collector.registerMetrics() diff --git a/omnistat/node_monitoring.py b/omnistat/node_monitoring.py index 6c6fda19..7e63d8bd 100755 --- a/omnistat/node_monitoring.py +++ b/omnistat/node_monitoring.py @@ -1,19 +1,19 @@ #!/usr/bin/env python3 # ------------------------------------------------------------------------------- # MIT License -# +# # Copyright (c) 2023 - 2024 Advanced Micro Devices, Inc. All Rights Reserved. -# +# # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: -# +# # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. -# +# # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE @@ -52,9 +52,11 @@ from omnistat import utils from omnistat.monitor import Monitor + def shutdown(): os.kill(os.getppid(), signal.SIGTERM) - return jsonify({'message': 'Shutting down...'}), 200 + return jsonify({"message": "Shutting down..."}), 200 + class OmnistatServer(gunicorn.app.base.BaseApplication): def __init__(self, app, options=None): @@ -63,14 +65,14 @@ def __init__(self, app, options=None): super().__init__() def load_config(self): - config = {key: value for key, value in self.options.items() - if key in self.cfg.settings and value is not None} + config = {key: value for key, value in self.options.items() if key in self.cfg.settings and value is not None} for key, value in config.items(): self.cfg.set(key.lower(), value) def load(self): return self.application + def main(): parser = argparse.ArgumentParser() parser.add_argument("--configfile", type=str, help="runtime config file", default=None) @@ -90,23 +92,24 @@ def main(): # Enforce network restrictions @app.before_request def restrict_ips(): - if '0.0.0.0' in monitor.runtimeConfig['collector_allowed_ips']: + if "0.0.0.0" in monitor.runtimeConfig["collector_allowed_ips"]: return - elif request.remote_addr not in monitor.runtimeConfig['collector_allowed_ips']: + elif request.remote_addr not in monitor.runtimeConfig["collector_allowed_ips"]: abort(403) @app.errorhandler(403) def forbidden(e): return jsonify(error="Access denied"), 403 - listenPort = config['omnistat.collectors'].get('port',8000) + listenPort = config["omnistat.collectors"].get("port", 8000) options = { - 'bind': '%s:%s' % ('0.0.0.0', listenPort), - 'workers': 1, + "bind": "%s:%s" % ("0.0.0.0", listenPort), + "workers": 1, } # Launch gunicorn OmnistatServer(app, options).run() -if __name__ == '__main__': + +if __name__ == "__main__": main() diff --git a/omnistat/omni_util.py b/omnistat/omni_util.py index d9e039cd..a842498c 100755 --- a/omnistat/omni_util.py +++ b/omnistat/omni_util.py @@ -48,11 +48,12 @@ from omnistat import utils + class UserBasedMonitoring: def __init__(self): logging.basicConfig(format="%(message)s", level=logging.INFO, stream=sys.stdout) self.scrape_interval = 60 # default scrape interval in seconds - self.timeout = 5 # default scrape timeout in seconds + self.timeout = 5 # default scrape timeout in seconds def setup(self, configFileArgument): self.configFile = utils.findConfigFile(configFileArgument) @@ -162,10 +163,7 @@ def startExporters(self): corebinding = self.runtimeConfig["omnistat.collectors"].get("corebinding", "1") cwd = os.getcwd() - cmd = ( - f"nice -n 20 {sys.executable} -m" - f" omnistat.node_monitoring --configfile={self.configFile}" - ) + cmd = f"nice -n 20 {sys.executable} -m" f" omnistat.node_monitoring --configfile={self.configFile}" # Assume environment is the same across nodes; if numactl is present # here, we expect it to be present in all nodes. @@ -181,8 +179,9 @@ def startExporters(self): "-N %s" % numNodes, "--ntasks-per-node=1", "%s" % sys.executable, - "-m", "omnistat.slurm_env", - "%s" % self.runtimeConfig["omnistat.collectors.slurm"].get("job_detection_file") + "-m", + "omnistat.slurm_env", + "%s" % self.runtimeConfig["omnistat.collectors.slurm"].get("job_detection_file"), ] utils.runShellCommand(srun_cmd, timeout=35, exit_on_error=True) diff --git a/omnistat/query.py b/omnistat/query.py index c4176290..563310a9 100755 --- a/omnistat/query.py +++ b/omnistat/query.py @@ -1,19 +1,19 @@ #!/usr/bin/env python3 # ------------------------------------------------------------------------------- # MIT License -# +# # Copyright (c) 2023 - 2024 Advanced Micro Devices, Inc. All Rights Reserved. -# +# # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: -# +# # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. -# +# # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE @@ -52,6 +52,7 @@ from omnistat.utils import displayVersion, getVersion, error + class queryMetrics: def __init__(self, versionData): @@ -67,24 +68,24 @@ def __init__(self, versionData): self.version = versionData["version"] def __del__(self): - if hasattr(self,'enable_redirect'): + if hasattr(self, "enable_redirect"): if self.enable_redirect: self.output.close() def read_config(self, configFileArgument): runtimeConfig = utils.readConfig(utils.findConfigFile(configFileArgument)) - section = 'omnistat.query' + section = "omnistat.query" self.config = {} - self.config["system_name"] = runtimeConfig[section].get('system_name','My Snazzy Cluster') - self.config["prometheus_url"] = runtimeConfig[section].get('prometheus_url','unknown') + self.config["system_name"] = runtimeConfig[section].get("system_name", "My Snazzy Cluster") + self.config["prometheus_url"] = runtimeConfig[section].get("prometheus_url", "unknown") def __del__(self): if self.enable_redirect: self.output.close() - def set_options(self,jobID=None,output_file=None,pdf=None,interval=None): + def set_options(self, jobID=None, output_file=None, pdf=None, interval=None): if jobID: - self.jobID=int(jobID) + self.jobID = int(jobID) if output_file: self.output_file = output_file if pdf: @@ -99,7 +100,7 @@ def setup(self): if not os.path.isfile(self.output_file): sys.exit() else: - self.output = open(self.output_file,"a") + self.output = open(self.output_file, "a") sys.stdout = self.output self.enable_redirect = True @@ -112,15 +113,11 @@ def setup(self): print("Job %s has not run yet." % self.jobID) sys.exit(0) - self.start_time = datetime.strptime( - self.jobinfo["begin_date"], "%Y-%m-%dT%H:%M:%S" - ) + self.start_time = datetime.strptime(self.jobinfo["begin_date"], "%Y-%m-%dT%H:%M:%S") if self.jobinfo["end_date"] == "Unknown": self.end_time = datetime.now() else: - self.end_time = datetime.strptime( - self.jobinfo["end_date"], "%Y-%m-%dT%H:%M:%S" - ) + self.end_time = datetime.strptime(self.jobinfo["end_date"], "%Y-%m-%dT%H:%M:%S") # NOOP if job is very short running runtime = (self.end_time - self.start_time).total_seconds() @@ -132,60 +129,63 @@ def setup(self): # Define metrics to report on (set 'title_short' to indicate inclusion in statistics summary) self.metrics = [ - {'metric':'rocm_utilization','title':'GPU Core Utilization','title_short':'Utilization (%)'}, - {'metric':'rocm_vram_used','title':'GPU Memory Used (%)','title_short':'Memory Use (%)'}, - {'metric':'rocm_temp_die_edge','title':'GPU Temperature - Die Edge (C)','title_short':'Temperature (C)'}, - {'metric':'rocm_sclk_clock_mhz','title':'GPU Clock Frequency (MHz)'}, - {'metric':'rocm_avg_pwr','title':'GPU Average Power (W)','title_short':'Power (W)'} - ] + {"metric": "rocm_utilization", "title": "GPU Core Utilization", "title_short": "Utilization (%)"}, + {"metric": "rocm_vram_used", "title": "GPU Memory Used (%)", "title_short": "Memory Use (%)"}, + { + "metric": "rocm_temp_die_edge", + "title": "GPU Temperature - Die Edge (C)", + "title_short": "Temperature (C)", + }, + {"metric": "rocm_sclk_clock_mhz", "title": "GPU Clock Frequency (MHz)"}, + {"metric": "rocm_avg_pwr", "title": "GPU Average Power (W)", "title_short": "Power (W)"}, + ] # Query job data info given start/stop time window - def query_jobinfo(self,start,end): + def query_jobinfo(self, start, end): duration_mins = (end - start).total_seconds() / 60 - assert(duration_mins > 0) + assert duration_mins > 0 # assemble coarsened query step based on job duration if duration_mins > 60: - step = '1h' + step = "1h" elif duration_mins > 15: - step = '15m' + step = "15m" elif duration_mins > 5: - step = '5m' + step = "5m" else: - step = '1m' + step = "1m" # Cull job info - results = self.prometheus.custom_query_range('(slurmjob_info{jobid="%s"})' % self.jobID, - start,end,step=step) + results = self.prometheus.custom_query_range('(slurmjob_info{jobid="%s"})' % self.jobID, start, end, step=step) - assert(len(results) > 0) - num_nodes = int(results[0]['metric']['nodes']) - partition = results[0]['metric']['partition'] - assert(num_nodes > 0) + assert len(results) > 0 + num_nodes = int(results[0]["metric"]["nodes"]) + partition = results[0]["metric"]["partition"] + assert num_nodes > 0 jobdata = {} jobdata["begin_date"] = start.strftime("%Y-%m-%dT%H:%M:%S") - jobdata["end_date"] = end.strftime("%Y-%m-%dT%H:%M:%S") - jobdata["num_nodes"] = num_nodes - jobdata["partition"] = partition + jobdata["end_date"] = end.strftime("%Y-%m-%dT%H:%M:%S") + jobdata["num_nodes"] = num_nodes + jobdata["partition"] = partition # Cull number of gpus - results = self.prometheus.custom_query_range('(rocm_num_gpus * on (instance) slurmjob_info{jobid="%s"})' % self.jobID, - start,end,step=step) - assert(len(results) == num_nodes) - num_gpus = int(results[0]['values'][0][1]) + results = self.prometheus.custom_query_range( + '(rocm_num_gpus * on (instance) slurmjob_info{jobid="%s"})' % self.jobID, start, end, step=step + ) + assert len(results) == num_nodes + num_gpus = int(results[0]["values"][0][1]) # warn if nodes do not have same gpu counts for node in range(len(results)): - value = int(results[node]['values'][0][1]) + value = int(results[node]["values"][0][1]) if value != num_gpus: - print("[WARNING]: compute nodes detected with differning number of GPUs (%i,%i) " % (num_gpus,value)) + print("[WARNING]: compute nodes detected with differning number of GPUs (%i,%i) " % (num_gpus, value)) break - assert(num_gpus > 0) + assert num_gpus > 0 self.numGPUs = num_gpus return jobdata - # gather relevant job data from info metric def query_slurm_job_internal(self): @@ -197,17 +197,18 @@ def query_slurm_job_internal(self): # loop over days starting from now to find time window covering desired job for day in range(365): aend = now - timedelta(days=day) - astart = (aend - timedelta(days=1)) + astart = aend - timedelta(days=1) - results = self.prometheus.custom_query_range('max(slurmjob_info{jobid="%s"})' % self.jobID, - astart,aend,step='1m') + results = self.prometheus.custom_query_range( + 'max(slurmjob_info{jobid="%s"})' % self.jobID, astart, aend, step="1m" + ) if not lastTimestamp and len(results) > 0: - lastTimestamp = datetime.fromtimestamp(results[0]['values'][-1][0]) + lastTimestamp = datetime.fromtimestamp(results[0]["values"][-1][0]) endWindow = aend - firstTimestamp = datetime.fromtimestamp(results[0]['values'][0][0]) + firstTimestamp = datetime.fromtimestamp(results[0]["values"][0][0]) continue elif lastTimestamp and len(results) > 0: - firstTimestamp = datetime.fromtimestamp(results[0]['values'][0][0]) + firstTimestamp = datetime.fromtimestamp(results[0]["values"][0][0]) continue elif lastTimestamp and len(results) == 0: break @@ -217,9 +218,9 @@ def query_slurm_job_internal(self): sys.exit(1) # expand job window to nearest minute - firstTimestamp = firstTimestamp.replace(second=0,microsecond=0) + firstTimestamp = firstTimestamp.replace(second=0, microsecond=0) lastTimestamp += timedelta(minutes=1) - lastTimestamp = lastTimestamp.replace(second=0,microsecond=0) + lastTimestamp = lastTimestamp.replace(second=0, microsecond=0) jobdata = self.query_jobinfo(firstTimestamp, lastTimestamp) return jobdata @@ -237,7 +238,7 @@ def query_slurm_job(self): "--format", "Start,End,NNodes,Partition", ] - path = shutil.which('sacct') + path = shutil.which("sacct") if path is None: print("[ERROR]: unable to resolve 'sacct' binary") sys.exit(1) @@ -268,8 +269,7 @@ def query_slurm_job(self): def get_hosts(self): self.hosts = [] results = self.prometheus.custom_query_range( - 'card0_rocm_utilization * on (instance) slurmjob_info{jobid="%s"}' - % self.jobID, + 'card0_rocm_utilization * on (instance) slurmjob_info{jobid="%s"}' % self.jobID, self.start_time, self.end_time, step=60, @@ -277,23 +277,22 @@ def get_hosts(self): for result in results: self.hosts.append(result["metric"]["instance"]) - # def get_num_gpus(self): # self.numGPUs = 0 # if self.jobinfo["partition"] in self.config: # if "num_gpus" in self.config[self.jobinfo["partition"]]: # self.numGPUs = self.config[self.jobinfo["partition"]]["num_gpus"] - def gather_data(self,saveTimeSeries=False): + def gather_data(self, saveTimeSeries=False): self.stats = {} self.time_series = {} self.max_GPU_memory_avail = [] for entry in self.metrics: - metric = entry['metric'] + metric = entry["metric"] - self.stats[metric + "_min"] = [] - self.stats[metric + "_max"] = [] + self.stats[metric + "_min"] = [] + self.stats[metric + "_max"] = [] self.stats[metric + "_mean"] = [] if saveTimeSeries: @@ -302,10 +301,10 @@ def gather_data(self,saveTimeSeries=False): for gpu in range(self.numGPUs): # (1) capture time series that assembles [mean] value at each timestamp across all assigned nodes - times,values_mean = self.query_time_series_data("card" + str(gpu) + "_" + metric,"avg") + times, values_mean = self.query_time_series_data("card" + str(gpu) + "_" + metric, "avg") # (2) capture time series that assembles [max] value at each timestamp across all assigned nodes - times,values_max = self.query_time_series_data("card" + str(gpu) + "_" + metric,"max") + times, values_max = self.query_time_series_data("card" + str(gpu) + "_" + metric, "max") # if gpu == 0: # for i in range(len(times)): @@ -315,10 +314,10 @@ def gather_data(self,saveTimeSeries=False): self.stats[metric + "_max"].append(np.max(values_max)) self.stats[metric + "_mean"].append(np.mean(values_mean)) - if metric == 'rocm_vram_used': + if metric == "rocm_vram_used": # compute % memory used - times2,values2_min = self.query_time_series_data("card" + str(gpu) + "_rocm_vram_total","min") - times2,values2_max = self.query_time_series_data("card" + str(gpu) + "_rocm_vram_total","max") + times2, values2_min = self.query_time_series_data("card" + str(gpu) + "_rocm_vram_total", "min") + times2, values2_max = self.query_time_series_data("card" + str(gpu) + "_rocm_vram_total", "max") memoryMin = np.min(values2_min) memoryMax = np.max(values2_max) @@ -327,20 +326,19 @@ def gather_data(self,saveTimeSeries=False): sys.exit(1) memoryAvail = memoryMax - self.stats[metric + "_max"] [-1] = 100.0 * self.stats[metric + "_max"] [-1] / memoryAvail + self.stats[metric + "_max"][-1] = 100.0 * self.stats[metric + "_max"][-1] / memoryAvail self.stats[metric + "_mean"][-1] = 100.0 * self.stats[metric + "_mean"][-1] / memoryAvail self.max_GPU_memory_avail.append(memoryAvail) values_mean = 100.0 * values_mean / memoryAvail - values_max = 100.0 * values_max / memoryAvail + values_max = 100.0 * values_max / memoryAvail if saveTimeSeries: - self.time_series[metric].append({'time':times,'values':values_mean}) + self.time_series[metric].append({"time": times, "values": values_mean}) return def generate_report_card(self): system = self.config["system_name"] - print("") print("-" * 40) print("Omnistat Report Card for Job # %i" % self.jobID) @@ -352,26 +350,29 @@ def generate_report_card(self): print("") print("GPU Statistics:") print("") - print(" %6s |" % "",end='') + print(" %6s |" % "", end="") for entry in self.metrics: - if 'title_short' in entry: - #print("%16s |" % entry['title_short'],end='') - print(" %s |" % entry['title_short'].center(16),end='') + if "title_short" in entry: + # print("%16s |" % entry['title_short'],end='') + print(" %s |" % entry["title_short"].center(16), end="") print("") - print(" %6s |" % "GPU #",end='') + print(" %6s |" % "GPU #", end="") for entry in self.metrics: - if 'title_short' in entry: - print(" %8s%8s |" % ("Max".center(6),"Mean".center(6)),end='') + if "title_short" in entry: + print(" %8s%8s |" % ("Max".center(6), "Mean".center(6)), end="") print("") print(" " + "-" * 84) for card in range(self.numGPUs): - print(" %6s |" % card,end='') + print(" %6s |" % card, end="") for entry in self.metrics: - if 'title_short' not in entry: + if "title_short" not in entry: continue - metric = entry['metric'] - print(" %6.2f %6.2f |" % (self.stats[metric + "_max"][card],self.stats[metric + "_mean"][card]),end='') + metric = entry["metric"] + print( + " %6.2f %6.2f |" % (self.stats[metric + "_max"][card], self.stats[metric + "_mean"][card]), + end="", + ) print("") print("") @@ -383,68 +384,62 @@ def generate_report_card(self): print("Version = %s" % version) return - - - def query_time_series_data(self,metric_name,reducer=None,dataType=float): + def query_time_series_data(self, metric_name, reducer=None, dataType=float): if reducer is None: results = self.prometheus.custom_query_range( - '(%s * on (instance) slurmjob_info{jobid="%s"})' - % (metric_name, self.jobID), + '(%s * on (instance) slurmjob_info{jobid="%s"})' % (metric_name, self.jobID), self.start_time, self.end_time, step=self.interval, ) else: results = self.prometheus.custom_query_range( - '%s(%s * on (instance) group_left() slurmjob_info{jobid="%s"})' - % (reducer, metric_name, self.jobID), + '%s(%s * on (instance) group_left() slurmjob_info{jobid="%s"})' % (reducer, metric_name, self.jobID), self.start_time, self.end_time, step=self.interval, ) - results = np.asarray(results[0]['values']) + results = np.asarray(results[0]["values"]) # convert to time format - time = results[:,0].astype(int).astype('datetime64[s]') - #time = results[:,0].astype(int) + time = results[:, 0].astype(int).astype("datetime64[s]") + # time = results[:,0].astype(int) # let user decide on conversion type for gauge metric if dataType == int: - values = results[:,1].astype(int) + values = results[:, 1].astype(int) elif dataType == float: - values = results[:,1].astype(float) + values = results[:, 1].astype(float) - return time,values + return time, values - def query_gpu_metric(self,metricName): + def query_gpu_metric(self, metricName): stats = {} - stats['mean'] = [] - stats['max'] = [] + stats["mean"] = [] + stats["max"] = [] for gpu in range(self.numGPUs): metric = "card" + str(gpu) + "_" + metricName - #-- + # -- # Mean results results = self.prometheus.custom_query_range( - 'avg(%s * on (instance) slurmjob_info{jobid="%s"})' - % (metric, self.jobID), + 'avg(%s * on (instance) slurmjob_info{jobid="%s"})' % (metric, self.jobID), self.start_time, self.end_time, step=60, ) - + assert len(results) == 1 data = results[0]["values"] data2 = np.asarray(data, dtype=float) - stats['mean'].append(np.mean(data2[:,1])) + stats["mean"].append(np.mean(data2[:, 1])) - #-- + # -- # Max results results = self.prometheus.custom_query_range( - 'max(%s * on (instance) slurmjob_info{jobid="%s"})' - % (metric, self.jobID), + 'max(%s * on (instance) slurmjob_info{jobid="%s"})' % (metric, self.jobID), self.start_time, self.end_time, step=60, @@ -453,99 +448,125 @@ def query_gpu_metric(self,metricName): assert len(results) == 1 data = results[0]["values"] data2 = np.asarray(data, dtype=float) - stats['max'].append(np.max(data2[:,1])) - - return(stats) - - def dumpFile(self,outputFile): - doc = SimpleDocTemplate(outputFile,pagesize=letter, - rightMargin=1 * inch,leftMargin=1 * inch, - topMargin=62,bottomMargin=18,showBoundary=0) - + stats["max"].append(np.max(data2[:, 1])) + + return stats + + def dumpFile(self, outputFile): + doc = SimpleDocTemplate( + outputFile, + pagesize=letter, + rightMargin=1 * inch, + leftMargin=1 * inch, + topMargin=62, + bottomMargin=18, + showBoundary=0, + ) + styles = getSampleStyleSheet() - normal = ParagraphStyle('normal') - Story=[] - Story.append(Spacer(1,0.1*inch)) - Story.append(HRFlowable(width="100%",thickness=2)) - ptext=''' + normal = ParagraphStyle("normal") + Story = [] + Story.append(Spacer(1, 0.1 * inch)) + Story.append(HRFlowable(width="100%", thickness=2)) + ptext = """ HPC Report Card: JobID = %s
Start Time: %s
End Time: %s
- ''' % (self.jobID,self.start_time,self.end_time.strftime("%Y-%m-%d %H:%M:%S")) + """ % ( + self.jobID, + self.start_time, + self.end_time.strftime("%Y-%m-%d %H:%M:%S"), + ) Story.append(Paragraph(ptext, styles["Bullet"])) - Story.append(HRFlowable(width="100%",thickness=2)) - -# JobID: %s
+ Story.append(HRFlowable(width="100%", thickness=2)) + + # JobID: %s
# generate Utilization Table - Story.append(Spacer(1,0.2*inch)) - ptext='''GPU Statistics''' - Story.append(Paragraph(ptext,normal)) - Story.append(Spacer(1,0.2*inch)) - #Story.append(HRFlowable(width="100%",thickness=1)) + Story.append(Spacer(1, 0.2 * inch)) + ptext = """GPU Statistics""" + Story.append(Paragraph(ptext, normal)) + Story.append(Spacer(1, 0.2 * inch)) + # Story.append(HRFlowable(width="100%",thickness=1)) - #-- + # -- # Display general GPU Statistics - #-- + # -- data = [] - data.append(['','Utilization (%)','','Memory Use (%)','','Temperature (C)','','Power (W)','']) - data.append(['GPU #','Max','Mean','Max','Mean','Max','Mean','Max','Mean']) + data.append(["", "Utilization (%)", "", "Memory Use (%)", "", "Temperature (C)", "", "Power (W)", ""]) + data.append(["GPU #", "Max", "Mean", "Max", "Mean", "Max", "Mean", "Max", "Mean"]) for gpu in range(self.numGPUs): - data.append([gpu, - "%.2f" % self.stats['rocm_utilization_max'][gpu], "%.2f" % self.stats['rocm_utilization_mean'][gpu], - "%.2f" % self.stats['rocm_vram_used_max'][gpu], "%.2f" % self.stats['rocm_vram_used_mean'][gpu], - "%.2f" % self.stats['rocm_temp_die_edge_max'][gpu], "%.2f" % self.stats['rocm_temp_die_edge_mean'][gpu], - "%.2f" % self.stats['rocm_avg_pwr_max'][gpu], "%.2f" % self.stats['rocm_avg_pwr_mean'][gpu] - ]) - - t=Table(data,rowHeights=[.21*inch] * len(data), - colWidths=[0.55*inch,0.72*inch]) - t.hAlign='LEFT' - t.setStyle(TableStyle([('LINEBELOW',(0,1),(-1,1),1.5,colors.black), - ('ALIGN',(0,0),(-1,-1),'CENTER')])) - t.setStyle(TableStyle([('LINEBEFORE',(1,0),(1,-1),1.25,colors.darkgrey), - ('LINEAFTER', (2,0),(2,-1),1.25,colors.darkgrey), - ('LINEAFTER', (4,0),(4,-1),1.25,colors.darkgrey), - ('LINEAFTER', (6,0),(6,-1),1.25,colors.darkgrey) - ])) - t.setStyle(TableStyle([('SPAN',(1,0),(2,0)), - ('SPAN',(3,0),(4,0)), - ('SPAN',(5,0),(6,0)), - ('SPAN',(7,0),(8,0)) - ])) - - for each in range(2,len(data)): + data.append( + [ + gpu, + "%.2f" % self.stats["rocm_utilization_max"][gpu], + "%.2f" % self.stats["rocm_utilization_mean"][gpu], + "%.2f" % self.stats["rocm_vram_used_max"][gpu], + "%.2f" % self.stats["rocm_vram_used_mean"][gpu], + "%.2f" % self.stats["rocm_temp_die_edge_max"][gpu], + "%.2f" % self.stats["rocm_temp_die_edge_mean"][gpu], + "%.2f" % self.stats["rocm_avg_pwr_max"][gpu], + "%.2f" % self.stats["rocm_avg_pwr_mean"][gpu], + ] + ) + + t = Table(data, rowHeights=[0.21 * inch] * len(data), colWidths=[0.55 * inch, 0.72 * inch]) + t.hAlign = "LEFT" + t.setStyle( + TableStyle([("LINEBELOW", (0, 1), (-1, 1), 1.5, colors.black), ("ALIGN", (0, 0), (-1, -1), "CENTER")]) + ) + t.setStyle( + TableStyle( + [ + ("LINEBEFORE", (1, 0), (1, -1), 1.25, colors.darkgrey), + ("LINEAFTER", (2, 0), (2, -1), 1.25, colors.darkgrey), + ("LINEAFTER", (4, 0), (4, -1), 1.25, colors.darkgrey), + ("LINEAFTER", (6, 0), (6, -1), 1.25, colors.darkgrey), + ] + ) + ) + t.setStyle( + TableStyle( + [("SPAN", (1, 0), (2, 0)), ("SPAN", (3, 0), (4, 0)), ("SPAN", (5, 0), (6, 0)), ("SPAN", (7, 0), (8, 0))] + ) + ) + + for each in range(2, len(data)): if each % 2 == 0: bg_color = colors.lightgrey else: bg_color = colors.whitesmoke - t.setStyle(TableStyle([('BACKGROUND', (0, each), (-1, each), bg_color)])) + t.setStyle(TableStyle([("BACKGROUND", (0, each), (-1, each), bg_color)])) Story.append(t) - #-- + # -- # Display time-series plots - #-- + # -- - Story.append(Spacer(1,0.2*inch)) - Story.append(HRFlowable(width="100%",thickness=1)) - Story.append(Spacer(1,0.2*inch)) - ptext='''Time Series''' - Story.append(Paragraph(ptext,normal)) - Story.append(Spacer(1,0.2*inch)) + Story.append(Spacer(1, 0.2 * inch)) + Story.append(HRFlowable(width="100%", thickness=1)) + Story.append(Spacer(1, 0.2 * inch)) + ptext = """Time Series""" + Story.append(Paragraph(ptext, normal)) + Story.append(Spacer(1, 0.2 * inch)) for entry in self.metrics: - metric = entry['metric'] - plt.figure(figsize=(9,2.5)) + metric = entry["metric"] + plt.figure(figsize=(9, 2.5)) for gpu in range(self.numGPUs): - plt.plot(self.time_series[metric][gpu]['time'], - self.time_series[metric][gpu]['values'],linewidth=0.4,label='Card %i' % gpu) -# self.time_series[metric][gpu]['values'],marker='o',markersize=1,linewidth=0.4,label='Card %i' % gpu) - - plt.title(entry['title']) - plt.legend(bbox_to_anchor=(1.,0.5),loc='center left', ncol=1,frameon=True) + plt.plot( + self.time_series[metric][gpu]["time"], + self.time_series[metric][gpu]["values"], + linewidth=0.4, + label="Card %i" % gpu, + ) + # self.time_series[metric][gpu]['values'],marker='o',markersize=1,linewidth=0.4,label='Card %i' % gpu) + + plt.title(entry["title"]) + plt.legend(bbox_to_anchor=(1.0, 0.5), loc="center left", ncol=1, frameon=True) plt.grid() ax = plt.gca() @@ -553,36 +574,36 @@ def dumpFile(self,outputFile): formatter = mdates.ConciseDateFormatter(locator) ax.xaxis.set_major_locator(locator) ax.xaxis.set_major_formatter(formatter) - plt.savefig('.utilization.png',dpi=150,bbox_inches='tight') + plt.savefig(".utilization.png", dpi=150, bbox_inches="tight") plt.close() - aplot = Image('.utilization.png') - aplot.hAlign='LEFT' + aplot = Image(".utilization.png") + aplot.hAlign = "LEFT" aplot._restrictSize(6.5 * inch, 4 * inch) Story.append(aplot) - os.remove('.utilization.png') + os.remove(".utilization.png") - Story.append(Spacer(1,0.2*inch)) - Story.append(HRFlowable(width="100%",thickness=1)) + Story.append(Spacer(1, 0.2 * inch)) + Story.append(HRFlowable(width="100%", thickness=1)) - footerStyle = ParagraphStyle('footer', - fontSize=8, - parent=styles['Normal'], + footerStyle = ParagraphStyle( + "footer", + fontSize=8, + parent=styles["Normal"], ) - ptext='''Query execution time = %.1f secs''' % (timeit.default_timer() - self.timer_start) - Story.append(Paragraph(ptext,footerStyle)) + ptext = """Query execution time = %.1f secs""" % (timeit.default_timer() - self.timer_start) + Story.append(Paragraph(ptext, footerStyle)) version = self.version if self.sha != "Unknown": version += " (%s)" % self.sha - ptext='''Version = %s''' % version - Story.append(Paragraph(ptext,footerStyle)) - Story.append(HRFlowable(width="100%",thickness=1)) + ptext = """Version = %s""" % version + Story.append(Paragraph(ptext, footerStyle)) + Story.append(HRFlowable(width="100%", thickness=1)) # Build the .pdf doc.build(Story) - - return + return def main(): @@ -591,7 +612,7 @@ def main(): parser = argparse.ArgumentParser() parser.add_argument("--configfile", type=str, help="runtime config file", default=None) parser.add_argument("--job", help="jobId to query") - parser.add_argument("--interval",type=int,help="sampling interval in secs (default=60)",default=60) + parser.add_argument("--interval", type=int, help="sampling interval in secs (default=60)", default=60) parser.add_argument("--output", help="location for stdout report") parser.add_argument("-v", "--version", help="print version info and exit", action="store_true") parser.add_argument("--pdf", help="generate PDF report") @@ -617,5 +638,6 @@ def main(): if args.pdf: query.dumpFile(args.pdf) + if __name__ == "__main__": main() diff --git a/omnistat/slurm_env.py b/omnistat/slurm_env.py index 61449263..0e861809 100755 --- a/omnistat/slurm_env.py +++ b/omnistat/slurm_env.py @@ -1,19 +1,19 @@ #!/usr/bin/env python3 # ------------------------------------------------------------------------------- # MIT License -# +# # Copyright (c) 2023 - 2024 Advanced Micro Devices, Inc. All Rights Reserved. -# +# # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: -# +# # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. -# +# # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE @@ -38,6 +38,7 @@ jobData = {} jobFile = "/tmp/omni_slurmjobinfo" + def main(): if len(sys.argv) > 1: jobFile = sys.argv[1] @@ -52,11 +53,12 @@ def main(): else: jobData["SLURM_JOB_BATCHMODE"] = 1 - json.dump(jobData,open(jobFile,"w"),indent=4) + json.dump(jobData, open(jobFile, "w"), indent=4) else: print("ERROR: SLURM settings not visible in current environment. Verify running in active job") sys.exit(1) - -if __name__ == '__main__': + + +if __name__ == "__main__": main() diff --git a/omnistat/utils.py b/omnistat/utils.py index cca70395..645da33d 100644 --- a/omnistat/utils.py +++ b/omnistat/utils.py @@ -1,18 +1,18 @@ # ------------------------------------------------------------------------------- # MIT License -# +# # Copyright (c) 2023 - 2024 Advanced Micro Devices, Inc. All Rights Reserved. -# +# # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: -# +# # The above copyright notice and this permission notice shall be included in all # copies or substantial portions of the Software. -# +# # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE @@ -34,6 +34,7 @@ from pathlib import Path + def convert_bdf_to_gpuid(bdf_string): """ Converts BDF text string in hex format to a GPU location id in the form written by kfd driver @@ -46,27 +47,29 @@ def convert_bdf_to_gpuid(bdf_string): int: location_id """ - domain = int(bdf_string.split(':')[0],16) + domain = int(bdf_string.split(":")[0], 16) # strip leading domain - bdf = bdf_string .split(':')[1:] + bdf = bdf_string.split(":")[1:] # cull out bus, device, and function as ints - bus = int(bdf[0],16) - dev_func = bdf[1].split('.') - device = int(dev_func[0],16) - function = int(dev_func[1],16) + bus = int(bdf[0], 16) + dev_func = bdf[1].split(".") + device = int(dev_func[0], 16) + function = int(dev_func[1], 16) # assemble id per kfd driver - location_id = ((bus << 8) | function) + location_id = (bus << 8) | function return location_id + def pass_through_indexing(numGpus): """returns a pass through GPU indexingwith 0:0, 1:1, etc. Intended for use in cases where - exact mapping cannot be ascertained by reading sysfs topology files. + exact mapping cannot be ascertained by reading sysfs topology files. """ gpu_index_mapping = {} for i in range(numGpus): gpu_index_mapping[i] = str(i) return gpu_index_mapping + def gpu_index_mapping(bdfMapping, expectedNumGPUs): """Generate a mapping between kfd gpu indexing (SMI lib) to those of HIP_VISIBLE_DEVICES. Intended for use with metric labeling to identify devices based on HIP_VISIBLE_DEVICES indexing. @@ -79,7 +82,7 @@ def gpu_index_mapping(bdfMapping, expectedNumGPUs): dict: maps kfd indices to HIP_VISIBLE_DEVICES indices """ kfd_nodes = "/sys/class/kfd/kfd/topology/nodes" - logging.info("GPU topology indexing: Scanning devices from %s"% kfd_nodes) + logging.info("GPU topology indexing: Scanning devices from %s" % kfd_nodes) if not os.path.isdir(kfd_nodes): logging.warn("--> directory not found") return pass_through_indexing(expectedNumGPUs) @@ -95,8 +98,8 @@ def gpu_index_mapping(bdfMapping, expectedNumGPUs): properties = {} with open(file) as f: for line in f: - key, value = line.strip().split(' ') - if key == 'location_id': + key, value = line.strip().split(" ") + if key == "location_id": location_id = int(value) if location_id == 0: numNonGPUs += 1 @@ -109,7 +112,7 @@ def gpu_index_mapping(bdfMapping, expectedNumGPUs): return pass_through_indexing(expectedNumGPUs) if numGPUs != expectedNumGPUs: - logging.warn("--> did not detect expected number of GPUs in sysfs (%i vs %i)" % (numGPUs,expectedNumGPUs)) + logging.warn("--> did not detect expected number of GPUs in sysfs (%i vs %i)" % (numGPUs, expectedNumGPUs)) return pass_through_indexing(expectedNumGPUs) gpuMappingOrder = {} @@ -123,6 +126,7 @@ def gpu_index_mapping(bdfMapping, expectedNumGPUs): logging.info("--> Mapping: %s" % gpuMappingOrder) return gpuMappingOrder + def error(message): """Log an error message and exit @@ -133,7 +137,7 @@ def error(message): sys.exit(1) -def findConfigFile(configFileArgument = None): +def findConfigFile(configFileArgument=None): """Identify configuration file location Try to find one of the following locations in the filesystem: @@ -209,9 +213,9 @@ def runShellCommand(command, capture_output=True, text=True, exit_on_error=False return results -def runBGProcess(command, outputFile=".bgcommand.output", mode='w'): +def runBGProcess(command, outputFile=".bgcommand.output", mode="w"): logging.debug("Command to run in background = %s" % command) - #results = subprocess.Popen(command,stdout=subprocess.PIPE,stderr=open(outputFile,"w")) + # results = subprocess.Popen(command,stdout=subprocess.PIPE,stderr=open(outputFile,"w")) outfile = open(outputFile, mode) results = subprocess.Popen(command, stdout=outfile, stderr=outfile) @@ -264,7 +268,7 @@ def removeQuotes(input): def getVersion(): """Return omnistat version info""" try: - return version('omnistat') + return version("omnistat") except importlib.metadata.PackageNotFoundError: # When package is not installed, rely on setuptools-git-versioning # to figure out the version; use the executable because the internal diff --git a/test/test_integration.py b/test/test_integration.py index bf2712bf..f02456dc 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -12,6 +12,7 @@ # ROCm is installed if we can find `rocminfo' in the host. rocm_host = True if shutil.which("rocminfo") else False + class TestIntegration: url = "http://localhost:9090/" node = "node:8000" @@ -66,11 +67,12 @@ def test_job(self): assert jobid == last_jobid + 1, "One job should have been executed" num_samples = len(job["values"]) - assert num_samples == job_seconds or num_samples == job_seconds + 1, \ - "Expected approximately one sample per second" + assert ( + num_samples == job_seconds or num_samples == job_seconds + 1 + ), "Expected approximately one sample per second" # Execute an empty job lasting a given amount of seconds def run_job(self, seconds): - sbatch_cmd = f"sbatch --wrap=\"sleep {seconds}\"" + sbatch_cmd = f'sbatch --wrap="sleep {seconds}"' exec_cmd = f"docker exec slurm-controller-1 bash -c 'cd /jobs; {sbatch_cmd}'" os.system(exec_cmd)