diff --git a/pyproject.toml b/pyproject.toml index 7e86cad..b597cc6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,21 +27,21 @@ classifiers = [ ] dependencies = [ - "pika>=1.3.2", - "psutil>=5.9.2", - "PyYAML>=6.0.1", - "requests>=2.31.0", - "fastapi>=0.110.2", - "flatten_dict>=0.4.2", - "pydantic>=2.7.4,<2.8", - "tinyflux>=1.0.0", - "uvicorn>=0.29.0", - "lazy-import>=0.2.2", - "confluent-kafka>=2.4.0", - "docker==7.1.0", - "numpy>=1.23.5", - "pandas~=1.4.3", - "tox-gh-actions>=3.2.0", + "pika>=1.3.2", + "psutil>=5.9.2", + "PyYAML>=6.0.1", + "requests>=2.31.0", + "fastapi>=0.110.2", + "flatten_dict>=0.4.2", + "pydantic>=2.7.4,<2.8", + "tinyflux>=1.0.0", + "uvicorn>=0.29.0", + "lazy-import>=0.2.2", + "confluent-kafka>=2.4.0", + "docker==7.1.0", + "numpy>=1.23.5", + "pandas~=1.4.3", + "tox-gh-actions>=3.2.0", ] optional-dependencies.ml = [ diff --git a/src/qoa4ml/utils/jetson/command.py b/src/qoa4ml/utils/jetson/command.py new file mode 100644 index 0000000..e063165 --- /dev/null +++ b/src/qoa4ml/utils/jetson/command.py @@ -0,0 +1,113 @@ +import logging +import os + +# Launch command +import subprocess as sp +import sys +import threading +from queue import Queue + +# Create logger +logger = logging.getLogger(__name__) +EXTRA_TIMEOUT = 1.0 +# Reference: +# https://eli.thegreenplace.net/2017/interacting-with-a-long-running-child-process-in-python/ +# https://stackoverflow.com/questions/37942022/returncode-of-popen-object-is-none-after-the-process-is-terminated/42376107 +# https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python +# https://docs.python.org/3/tutorial/errors.html +# https://stackoverflow.com/questions/10756383/timeout-on-subprocess-readline-in-python +# https://stackoverflow.com/questions/3733270/python-subprocess-timeout + + +class Command: + class CommandError(Exception): + def __init__(self, message, errno): + self.message = message + self.errno = errno + + def __str__(self): + return f"[errno:{self.errno}] {self.message}" + + class TimeoutError(CommandError): + def __init__(self): + super().__init__("Process does not replied in time", -1) + + @staticmethod + def run_command(command, repeat=5, timeout=2): + cmd = Command(command) + for idx in range(repeat): + try: + return cmd(timeout=timeout) + except Command.TimeoutError as error: + logger.error(f"[{idx}] {error}") + raise Command.CommandError(f"Error to start {command}", -2) + + def __init__(self, command): + self.process = None + self.command = command + + def __call__(self, timeout=None): + def target(out_queue, err_queue): + # Run process + try: + # https://stackoverflow.com/questions/33277452/prevent-unexpected-stdin-reads-and-lock-in-subprocess + self.process = sp.Popen( + self.command, + stdout=sp.PIPE, + stderr=sp.PIPE, + stdin=open(os.devnull), + preexec_fn=os.setsid, + ) + # Read lines output + for line in iter(self.process.stdout.readline, b""): + try: + line = line.decode("utf-8") + line = str(line.strip()) + except UnicodeEncodeError: + line = line.encode("ascii", "ignore").decode("ascii") + out_queue.put(line) + # Close and terminate + self.process.stdout.close() + self.process.wait() + except Exception: + # Store error message + err_queue.put(sys.exc_info()) + + # Initialize lists + is_timeout = False + out_queue = Queue() + err_queue = Queue() + thread = threading.Thread( + target=target, + args=( + out_queue, + err_queue, + ), + ) + thread.start() + # Wait timeout process + thread.join(timeout) + if thread.is_alive(): + logger.error(f"Terminating process: {self.command}") + if self.process is not None: + self.process.terminate() + thread.join(timeout=EXTRA_TIMEOUT) + logger.warning(f"Process terminated: {self.command}") + is_timeout = True + # Read the output + # Extract exception and raise + if not err_queue.empty(): + ex_type, ex_value, tb_str = err_queue.get() + ex_value.__traceback__ = tb_str + raise ex_value + if is_timeout: + raise Command.TimeoutError() + if self.process.returncode != 0: + raise Command.CommandError("Error process:", self.process.returncode) + return list(out_queue.queue) + + def communicate(self, timeout=None): + self.__call__(timeout=timeout) + + +# EOF diff --git a/src/qoa4ml/utils/jetson/common.py b/src/qoa4ml/utils/jetson/common.py new file mode 100644 index 0000000..9a6ad54 --- /dev/null +++ b/src/qoa4ml/utils/jetson/common.py @@ -0,0 +1,243 @@ +import array +import fcntl + +# Logging +import logging +import os +import re + +# Socket and IP information +import socket +import struct + +# Launch command +import subprocess as sp +from base64 import b64encode +from random import choice +from string import ascii_letters + +# Load Author +AUTH_RE = re.compile(r""".*__author__ = ["'](.*?)['"]""", re.S) +# Create logger +logger = logging.getLogger(__name__) + +# Max possible bytes for interface result. Will truncate if more than 4096 characters to describe interfaces. +MAX_BYTES = 4096 +# We're going to make a blank byte array to operate on. This is our fill char. +FILL_CHAR = b"\0" +# Command defined in ioctl.h for the system operation for get iface list +# Defined at https://code.woboq.org/qt5/include/bits/ioctls.h.html under +# /* Socket configuration controls. */ section. +SIOCGIFCONF = 0x8912 + + +class GenericInterface: + def __init__(self): + self._controller = None + self._init = None + self._data = {} + + def _initialize(self, controller, init=None): + if init is None: + init = {} + self._controller = controller + self._init = init + + def _update(self, data): + self._data = data + + def items(self): + return self._data.items() + + def keys(self): + return self._data.keys() + + def values(self): + return self._data.values() + + def get(self, key, default=None): + return self._data.get(key, default) + + def __len__(self): + return len(self._data) + + def __getitem__(self, key): + return self._data[key] + + def __contains__(self, key): + return key in self._data + + def __iter__(self): + return iter(self._data) + + def __reversed__(self): + return reversed(self._data) + + def __missing__(self, key): + raise KeyError(key) + + def __eq__(self, other): + if isinstance(other, GenericInterface): + return self._data == other._data + elif isinstance(other, dict): + return self._data == other + else: + return NotImplemented + + def __ne__(self, other): + result = self.__eq__(other) + if result is NotImplemented: + return result + else: + return not result + + def __str__(self): + return str(self._data) + + def __repr__(self): + return repr(self._data) + + +def check_file(path): + return os.path.isfile(path) and os.access(path, os.R_OK) + + +def cat(path): + with open(path) as f: + return f.readline().rstrip("\x00") + + +def locate_commands(name, commands): + for cmd in commands: + if os.path.exists(cmd): + return cmd + return None + + +def import_os_variables(source, pattern): + if os.path.isfile(source): + logger.debug(f"Open source file {source}") + source_env = {} + try: + proc = sp.Popen( + ["bash", "-c", f"source {source} && env"], + stdout=sp.PIPE, + stderr=sp.PIPE, + ) + # Load variables + for tup in [s.decode("utf-8").strip().split("=", 1) for s in proc.stdout]: + name = tup[0].strip() + value = tup[1].strip() + if pattern in name: + source_env[name] = value + finally: + proc.stdout.close() + return source_env + else: + logger.error("File does not exist") + return {} + + +def get_var(match_re): + """ + Show the version of this package + + :return: Version number + :rtype: string + """ + # Load version package + with open( + os.path.join(os.path.abspath(os.path.dirname(__file__)), "../", "__init__.py") + ) as fp: + match = match_re.match(fp.read()) + value = ( + match.group(1) + if match + else "".join(choice(ascii_letters) for i in range(16)) + ) + return value + + +def get_uptime(): + """Read uptime system + http://planzero.org/blog/2012/01/26/system_uptime_in_python,_a_better_way + """ + with open("/proc/uptime") as f: + uptime_seconds = float(f.readline().split()[0]) + return uptime_seconds + + +def status_disk(folder="/var/"): + disk = os.statvfs(folder) + # Evaluate the total space in GB + total_space = float(disk.f_bsize * disk.f_blocks) / 1024 / 1024 / 1024 + # Evaluate total used space in GB + total_used_space = ( + float(disk.f_bsize * (disk.f_blocks - disk.f_bfree)) / 1024 / 1024 / 1024 + ) + # Evaluate total available space in GB + total_avail_space = float(disk.f_bsize * disk.f_bfree) / 1024 / 1024 / 1024 + # Evaluate total non super-user space in GB + total_avail_space_non_root = ( + float(disk.f_bsize * disk.f_bavail) / 1024 / 1024 / 1024 + ) + return { + "total": total_space, + "used": total_used_space, + "available": total_avail_space, + "available_no_root": total_avail_space_non_root, + "unit": "G", + } + + +def get_local_interfaces(): + """Returns a dictionary of name:ip key value pairs. + - Reference: + * http://code.activestate.com/recipes/439093/#c1 + * https://gist.github.com/pklaus/289646 + """ + # Read hostname + hostname = socket.gethostname() + # Make a dgram socket to use as our file descriptor that we'll operate on. + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + # Make a byte array with our fill character. + names = array.array("B", MAX_BYTES * FILL_CHAR) + # Get the address of our names byte array for use in our struct. + names_address, names_length = names.buffer_info() + # Create a mutable byte buffer to store the data in + mutable_byte_buffer = struct.pack("iL", MAX_BYTES, names_address) + # mutate our mutable_byte_buffer with the results of get_iface_list. + # NOTE: mutated_byte_buffer is just a reference to mutable_byte_buffer - for the sake of clarity we've defined them as + # separate variables, however they are the same address space - that's how fcntl.ioctl() works since the mutate_flag=True + # by default. + mutated_byte_buffer = fcntl.ioctl(sock.fileno(), SIOCGIFCONF, mutable_byte_buffer) + # Close socket + sock.close() + # Get our max_bytes of our mutated byte buffer that points to the names variable address space. + max_bytes_out, names_address_out = struct.unpack("iL", mutated_byte_buffer) + # Convert names to a bytes array - keep in mind we've mutated the names array, so now our bytes out should represent + # the bytes results of the get iface list ioctl command. + namestr = bytearray(names) + # Each entry is 40 bytes long. The first 16 bytes are the name string. + # the 20-24th bytes are IP address octet strings in byte form - one for each byte. + # Don't know what 17-19 are, or bytes 25:40. + ip_dict = {} + for i in range(0, max_bytes_out, 40): + name = namestr[i : i + 16].split(FILL_CHAR, 1)[0] + name = name.decode("utf-8") + ip_bytes = namestr[i + 20 : i + 24] + full_addr = [] + for netaddr in ip_bytes: + if isinstance(netaddr, int): + full_addr.append(str(netaddr)) + elif isinstance(netaddr, str): + full_addr.append(str(ord(netaddr))) + ip_dict[name] = ".".join(full_addr) + # Remove loopback interface is in list + if "lo" in ip_dict: + del ip_dict["lo"] + return {"hostname": hostname, "interfaces": ip_dict} + + +def get_key(): + return str(b64encode(get_var(AUTH_RE).encode("utf-8"))) diff --git a/src/qoa4ml/utils/jetson/exceptions.py b/src/qoa4ml/utils/jetson/exceptions.py new file mode 100644 index 0000000..a5f776e --- /dev/null +++ b/src/qoa4ml/utils/jetson/exceptions.py @@ -0,0 +1,16 @@ +class JtopError(Exception): + """ + raise when jtop fail. The message attached show the reason. + """ + + def __init__(self, message, errors=""): + super().__init__(message, errors) + # Now for your custom code... + self.message = message + self.errors = errors + + def __repr__(self): + return str(self.message) + + def __str__(self): + return str(self.message) diff --git a/src/qoa4ml/utils/jetson/jetson_gpu.py b/src/qoa4ml/utils/jetson/jetson_gpu.py new file mode 100644 index 0000000..61a287f --- /dev/null +++ b/src/qoa4ml/utils/jetson/jetson_gpu.py @@ -0,0 +1,340 @@ +# Logging +import logging +import os + +from .command import Command +from .common import GenericInterface, cat +from .exceptions import JtopError + +# Create logger +logger = logging.getLogger(__name__) +# default ipgu path for Jetson devices +DEFAULT_IGPU_PATH = "/sys/class/devfreq/" + + +def check_nvidia_smi(): + cmd = Command(["nvidia-smi"]) + try: + cmd() + return True + except (OSError, Command.CommandError): + pass + return False + + +def igpu_read_freq(path): + # Read status online + gpu = {} + # Check if access to this file + if os.access(path + "/governor", os.R_OK): + with open(path + "/governor") as f: + # Write current engine + gpu["governor"] = f.read().strip() + # Check if access to this file + if os.access(path + "/cur_freq", os.R_OK): + with open(path + "/cur_freq") as f: + # Write current engine + gpu["cur"] = int(f.read()) // 1000 + # Decode clock rate + if os.access(path + "/max_freq", os.R_OK): + with open(path + "/max_freq") as f: + # Write status engine + gpu["max"] = int(f.read()) // 1000 + if os.access(path + "/min_freq", os.R_OK): + with open(path + "/min_freq") as f: + # Write status engine + gpu["min"] = int(f.read()) // 1000 + # Read GPC status + for idx in range(2): + # Read power control status + path_gpc = f"/sys/kernel/debug/bpmp/debug/clk/nafll_gpc{idx}/pto_counter" + if os.access(path_gpc, os.R_OK): + with open(path_gpc) as f: + # List all frequencies + if "GPC" not in gpu: + gpu["GPC"] = [] + gpu["GPC"] += [int(f.read()) // 1000] + return gpu + + +def igpu_read_status(path): + gpu = {} + # GPU status + if os.access(path + "/railgate_enable", os.R_OK): + with open(path + "/railgate_enable") as f: + # Read status railgate + gpu["railgate"] = int(f.read()) == 1 + # Mask status (Useful for nvpmodel) + if os.access(path + "/tpc_pg_mask", os.R_OK): + with open(path + "/tpc_pg_mask") as f: + # Read status TPG PG mask + gpu["tpc_pg_mask"] = int(f.read()) == 1 + # Status 3D scaling + if os.access(path + "/enable_3d_scaling", os.R_OK): + with open(path + "/enable_3d_scaling") as f: + # Read status 3D scaling + gpu["3d_scaling"] = int(f.read()) == 1 + # Current load GPU + if os.access(path + "/load", os.R_OK): + with open(path + "/load") as f: + # Read current GPU load + gpu["load"] = float(f.read()) / 10.0 + return gpu + + +def get_raw_igpu_devices(): + igpu_path = DEFAULT_IGPU_PATH + raw_output = {} + for item in os.listdir(igpu_path): + item_path = os.path.join(igpu_path, item) + if os.path.isfile(item_path) or os.path.islink(item_path): + # Check name device + name_path = f"{item}/device/of_node/name".format(item=item_path) + if os.path.isfile(name_path): + # Decode name + name = cat(name_path) + # path and file + raw_output[name_path] = f"{name}" + return raw_output + + +def find_igpu(igpu_path): + # Check if exist a integrated gpu + # if not os.path.exists("/dev/nvhost-gpu") and not os.path.exists("/dev/nvhost-power-gpu"): + # return [] + igpu = {} + if not os.path.isdir(igpu_path): + logger.error(f"Folder {igpu_path} doesn't exist") + return igpu + for item in os.listdir(igpu_path): + item_path = os.path.join(igpu_path, item) + if os.path.isfile(item_path) or os.path.islink(item_path): + # Check name device + name_path = f"{item_path}/device/of_node/name" + if os.path.isfile(name_path): + # Decode name + name = cat(name_path) + # Check if gpu + if name in ["gv11b", "gp10b", "ga10b", "gpu"]: + # Extract real path GPU device + path = os.path.realpath(os.path.join(item_path, "device")) + frq_path = os.path.realpath(item_path) + igpu[name] = { + "type": "integrated", + "path": path, + "frq_path": frq_path, + } + logger.info(f'GPU "{name}" status in {path}') + logger.info(f'GPU "{name}" frq in {path}') + # Check if railgate exist + path_railgate = os.path.join(path, "railgate_enable") + if os.path.isfile(path_railgate): + igpu[name]["railgate"] = path_railgate + # Check if 3d scaling exist + path_3d_scaling = os.path.join(path, "enable_3d_scaling") + if os.path.isfile(path_3d_scaling): + igpu[name]["3d_scaling"] = path_3d_scaling + else: + logger.debug(f"Skipped {name}") + return igpu + + +def find_dgpu(): + # Check if there are discrete gpu + # if not os.path.exists("/dev/nvidiactl") and not os.path.isdir("/dev/nvgpu-pci"): + # return [] + # https://enterprise-support.nvidia.com/s/article/Useful-nvidia-smi-Queries-2 + dgpu = {} + if check_nvidia_smi(): + logger.info("NVIDIA SMI exist!") + if dgpu: + logger.info("Discrete GPU found") + return dgpu + + +class GPU(GenericInterface): + """ + This class get the output from your GPU, this class is readable like a dictionary, + please read the documentation on :py:attr:`~jtop.jtop.gpu` but is also usable to enable, disable 3d scaling on your device. + + .. code-block:: python + + with jtop() as jetson: + if jetson.ok(): + jetson.gpu.set_scaling_3D("gpu", True) + + Below all methods available using the :py:attr:`~jtop.jtop.gpu` attribute + """ + + def __init__(self): + super(__class__, self).__init__() + + def set_scaling_3d(self, name, value): + """ + Enable disable GPU 3D scaling. this method send a command like below on your Jetson. + + Set 3D scaling on your board, like the command below. To know the GPU name use :py:attr:`~jtop.jtop.gpu` + + .. code-block:: python + + with jtop() as jetson: + if jetson.ok(): + jetson.gpu.set_scaling_3D("ga10b", True) + + is equivalent to: + + .. code-block:: bash + :class: no-copybutton + + echo 1 > /sys/devices/17000000.ga10b/enable_3d_scaling + + :param name: GPU name + :type name: str + :param value: Enable/Disable 3D scaling + :type value: bool + :raises JtopException: if GPU doesn't exist + """ + if name not in self._data: + raise JtopError(f'GPU "{name}" does not exist') + # Set new 3D scaling + self._controller.put( + {"gpu": {"command": "3d_scaling", "name": name, "value": value}} + ) + + def get_scaling_3d(self, name): + """ + Return status of 3D scaling, this output is also readable from :py:attr:`~jtop.jtop.gpu` attribute + + :param name: GPU name + :type name: str + :raises JtopException: if GPU doesn't exist + :return: status 3D scaling + :rtype: bool + """ + if name not in self._data: + raise JtopError(f'GPU "{name}" does not exist') + return self._data[name]["status"]["3d_scaling"] + + @property + def scaling_3d(self): + """ + Return status of 3D scaling, this output is also readable from :py:attr:`~jtop.jtop.gpu` attribute + + .. code-block:: python + + with jtop() as jetson: + if jetson.ok(): + # Set new 3D scaling + jetson.gpu.set_scaling_3D = True + # same of + jetson.gpu.set_scaling_3D("ga10b", True) + + :raises JtopException: if there are no integrated GPU + :return: status 3D scaling + :rtype: bool + """ + # Get first integrated gpu + name = self._get_first_integrated_gpu() + if not name: + raise JtopError("no Integrated GPU available") + return self.get_scaling_3d(name) + + @scaling_3d.setter + def scaling_3d(self, value): + # Get first integrated gpu + name = self._get_first_integrated_gpu() + if not name: + raise JtopError("no Integrated GPU available") + self.set_scaling_3d(name, value) + + def set_railgate(self, name, value): + if name not in self._data: + raise JtopError(f'GPU "{name}" does not exist') + # Set new 3D scaling + self._controller.put( + {"gpu": {"command": "railgate", "name": name, "value": value}} + ) + + def get_railgate(self, name): + if name not in self._data: + raise JtopError(f'GPU "{name}" does not exist') + return self._data[name]["status"]["railgate"] + + def _get_first_integrated_gpu(self): + for name in self._data: + if self._data[name]["type"] == "integrated": + return name + return "" + + +class GPUService: + def __init__(self): + # Detect integrated GPU + igpu_path = DEFAULT_IGPU_PATH + if os.getenv("JTOP_TESTING", False): + igpu_path = "/fake_sys/class/devfreq/" + logger.warning(f"Running in JTOP_TESTING folder={igpu_path}") + self._gpu_list = find_igpu(igpu_path) + # Find discrete GPU + self._gpu_list.update(find_dgpu()) + # Check status + if not self._gpu_list: + logger.warning("No NVIDIA GPU available") + + def set_scaling_3d(self, name, value): + if name not in self._gpu_list: + logger.error(f'GPU "{name}" does not exist') + return False + if "3d_scaling" not in self._gpu_list[name]: + logger.error(f'GPU "{name}" does not have 3D scaling') + return False + path_3d_scaling = self._gpu_list[name]["3d_scaling"] + string_value = "1" if value else "0" + # Write new status 3D scaling + try: + if os.access(path_3d_scaling, os.W_OK): + with open(path_3d_scaling, "w") as f: + f.write(string_value) + logger.info(f'GPU "{name}" set 3D scaling to {value}') + except OSError as e: + logger.error(f"I cannot set 3D scaling {e}") + + def set_railgate(self, name, value): + if name not in self._gpu_list: + logger.error(f'GPU "{name}" does not exist') + return False + if "railgate" not in self._gpu_list[name]: + logger.error(f'GPU "{name}" does not have railgate') + return False + path_railgate = self._gpu_list[name]["railgate"] + string_value = "1" if value else "0" + # Write new status railgate + try: + if os.access(path_railgate, os.W_OK): + with open(path_railgate, "w") as f: + f.write(string_value) + logger.info(f'GPU "{name}" set railgate to {value}') + except OSError as e: + logger.error(f"I cannot set Railgate {e}") + + def get_status(self): + gpu_list = {} + # Read iGPU frequency + for name, data in self._gpu_list.items(): + # Initialize GPU status + gpu = {"type": data["type"]} + # Detect frequency and load + if gpu["type"] == "integrated": + # Read status GPU + gpu["status"] = igpu_read_status(data["path"]) + # Read frequency + gpu["freq"] = igpu_read_freq(data["frq_path"]) + # Read power control status + if os.access(data["path"] + "/power/control", os.R_OK): + with open(data["path"] + "/power/control") as f: + gpu["power_control"] = f.read().strip() + elif gpu["type"] == "discrete": + logger.info("TODO discrete GPU") + # Load all status in GPU + gpu_list[name] = gpu + return gpu_list