Skip to content

Commit

Permalink
add monitoring jetson gpu
Browse files Browse the repository at this point in the history
  • Loading branch information
nguu0123 committed Aug 25, 2024
1 parent 9a1151c commit fa4d5b4
Show file tree
Hide file tree
Showing 5 changed files with 727 additions and 15 deletions.
30 changes: 15 additions & 15 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,21 @@ classifiers = [
]

dependencies = [
"pika>=1.3.2",
"psutil>=5.9.2",
"PyYAML>=6.0.1",
"requests>=2.31.0",
"fastapi>=0.110.2",
"flatten_dict>=0.4.2",
"pydantic>=2.7.4,<2.8",
"tinyflux>=1.0.0",
"uvicorn>=0.29.0",
"lazy-import>=0.2.2",
"confluent-kafka>=2.4.0",
"docker==7.1.0",
"numpy>=1.23.5",
"pandas~=1.4.3",
"tox-gh-actions>=3.2.0",
"pika>=1.3.2",
"psutil>=5.9.2",
"PyYAML>=6.0.1",
"requests>=2.31.0",
"fastapi>=0.110.2",
"flatten_dict>=0.4.2",
"pydantic>=2.7.4,<2.8",
"tinyflux>=1.0.0",
"uvicorn>=0.29.0",
"lazy-import>=0.2.2",
"confluent-kafka>=2.4.0",
"docker==7.1.0",
"numpy>=1.23.5",
"pandas~=1.4.3",
"tox-gh-actions>=3.2.0",
]

optional-dependencies.ml = [
Expand Down
113 changes: 113 additions & 0 deletions src/qoa4ml/utils/jetson/command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import logging
import os

# Launch command
import subprocess as sp
import sys
import threading
from queue import Queue

# Create logger
logger = logging.getLogger(__name__)
EXTRA_TIMEOUT = 1.0
# Reference:
# https://eli.thegreenplace.net/2017/interacting-with-a-long-running-child-process-in-python/
# https://stackoverflow.com/questions/37942022/returncode-of-popen-object-is-none-after-the-process-is-terminated/42376107
# https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
# https://docs.python.org/3/tutorial/errors.html
# https://stackoverflow.com/questions/10756383/timeout-on-subprocess-readline-in-python
# https://stackoverflow.com/questions/3733270/python-subprocess-timeout


class Command:
class CommandError(Exception):
def __init__(self, message, errno):
self.message = message
self.errno = errno

def __str__(self):
return f"[errno:{self.errno}] {self.message}"

class TimeoutError(CommandError):
def __init__(self):
super().__init__("Process does not replied in time", -1)

@staticmethod
def run_command(command, repeat=5, timeout=2):
cmd = Command(command)
for idx in range(repeat):
try:
return cmd(timeout=timeout)
except Command.TimeoutError as error:
logger.error(f"[{idx}] {error}")
raise Command.CommandError(f"Error to start {command}", -2)

def __init__(self, command):
self.process = None
self.command = command

def __call__(self, timeout=None):
def target(out_queue, err_queue):
# Run process
try:
# https://stackoverflow.com/questions/33277452/prevent-unexpected-stdin-reads-and-lock-in-subprocess
self.process = sp.Popen(
self.command,
stdout=sp.PIPE,
stderr=sp.PIPE,
stdin=open(os.devnull),
preexec_fn=os.setsid,
)
# Read lines output
for line in iter(self.process.stdout.readline, b""):
try:
line = line.decode("utf-8")
line = str(line.strip())
except UnicodeEncodeError:
line = line.encode("ascii", "ignore").decode("ascii")
out_queue.put(line)
# Close and terminate
self.process.stdout.close()
self.process.wait()
except Exception:
# Store error message
err_queue.put(sys.exc_info())

# Initialize lists
is_timeout = False
out_queue = Queue()
err_queue = Queue()
thread = threading.Thread(
target=target,
args=(
out_queue,
err_queue,
),
)
thread.start()
# Wait timeout process
thread.join(timeout)
if thread.is_alive():
logger.error(f"Terminating process: {self.command}")
if self.process is not None:
self.process.terminate()
thread.join(timeout=EXTRA_TIMEOUT)
logger.warning(f"Process terminated: {self.command}")
is_timeout = True
# Read the output
# Extract exception and raise
if not err_queue.empty():
ex_type, ex_value, tb_str = err_queue.get()
ex_value.__traceback__ = tb_str
raise ex_value
if is_timeout:
raise Command.TimeoutError()
if self.process.returncode != 0:
raise Command.CommandError("Error process:", self.process.returncode)
return list(out_queue.queue)

def communicate(self, timeout=None):
self.__call__(timeout=timeout)


# EOF
243 changes: 243 additions & 0 deletions src/qoa4ml/utils/jetson/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
import array
import fcntl

# Logging
import logging
import os
import re

# Socket and IP information
import socket
import struct

# Launch command
import subprocess as sp
from base64 import b64encode
from random import choice
from string import ascii_letters

# Load Author
AUTH_RE = re.compile(r""".*__author__ = ["'](.*?)['"]""", re.S)
# Create logger
logger = logging.getLogger(__name__)

# Max possible bytes for interface result. Will truncate if more than 4096 characters to describe interfaces.
MAX_BYTES = 4096
# We're going to make a blank byte array to operate on. This is our fill char.
FILL_CHAR = b"\0"
# Command defined in ioctl.h for the system operation for get iface list
# Defined at https://code.woboq.org/qt5/include/bits/ioctls.h.html under
# /* Socket configuration controls. */ section.
SIOCGIFCONF = 0x8912


class GenericInterface:
def __init__(self):
self._controller = None
self._init = None
self._data = {}

def _initialize(self, controller, init=None):
if init is None:
init = {}
self._controller = controller
self._init = init

def _update(self, data):
self._data = data

def items(self):
return self._data.items()

def keys(self):
return self._data.keys()

def values(self):
return self._data.values()

def get(self, key, default=None):
return self._data.get(key, default)

def __len__(self):
return len(self._data)

def __getitem__(self, key):
return self._data[key]

def __contains__(self, key):
return key in self._data

def __iter__(self):
return iter(self._data)

def __reversed__(self):
return reversed(self._data)

def __missing__(self, key):
raise KeyError(key)

def __eq__(self, other):
if isinstance(other, GenericInterface):
return self._data == other._data
elif isinstance(other, dict):
return self._data == other
else:
return NotImplemented

def __ne__(self, other):
result = self.__eq__(other)
if result is NotImplemented:
return result
else:
return not result

def __str__(self):
return str(self._data)

def __repr__(self):
return repr(self._data)


def check_file(path):
return os.path.isfile(path) and os.access(path, os.R_OK)


def cat(path):
with open(path) as f:
return f.readline().rstrip("\x00")


def locate_commands(name, commands):
for cmd in commands:
if os.path.exists(cmd):
return cmd
return None


def import_os_variables(source, pattern):
if os.path.isfile(source):
logger.debug(f"Open source file {source}")
source_env = {}
try:
proc = sp.Popen(
["bash", "-c", f"source {source} && env"],
stdout=sp.PIPE,
stderr=sp.PIPE,
)
# Load variables
for tup in [s.decode("utf-8").strip().split("=", 1) for s in proc.stdout]:
name = tup[0].strip()
value = tup[1].strip()
if pattern in name:
source_env[name] = value
finally:
proc.stdout.close()
return source_env
else:
logger.error("File does not exist")
return {}


def get_var(match_re):
"""
Show the version of this package
:return: Version number
:rtype: string
"""
# Load version package
with open(
os.path.join(os.path.abspath(os.path.dirname(__file__)), "../", "__init__.py")
) as fp:
match = match_re.match(fp.read())
value = (
match.group(1)
if match
else "".join(choice(ascii_letters) for i in range(16))
)
return value


def get_uptime():
"""Read uptime system
http://planzero.org/blog/2012/01/26/system_uptime_in_python,_a_better_way
"""
with open("/proc/uptime") as f:
uptime_seconds = float(f.readline().split()[0])
return uptime_seconds


def status_disk(folder="/var/"):
disk = os.statvfs(folder)
# Evaluate the total space in GB
total_space = float(disk.f_bsize * disk.f_blocks) / 1024 / 1024 / 1024
# Evaluate total used space in GB
total_used_space = (
float(disk.f_bsize * (disk.f_blocks - disk.f_bfree)) / 1024 / 1024 / 1024
)
# Evaluate total available space in GB
total_avail_space = float(disk.f_bsize * disk.f_bfree) / 1024 / 1024 / 1024
# Evaluate total non super-user space in GB
total_avail_space_non_root = (
float(disk.f_bsize * disk.f_bavail) / 1024 / 1024 / 1024
)
return {
"total": total_space,
"used": total_used_space,
"available": total_avail_space,
"available_no_root": total_avail_space_non_root,
"unit": "G",
}


def get_local_interfaces():
"""Returns a dictionary of name:ip key value pairs.
- Reference:
* http://code.activestate.com/recipes/439093/#c1
* https://gist.github.com/pklaus/289646
"""
# Read hostname
hostname = socket.gethostname()
# Make a dgram socket to use as our file descriptor that we'll operate on.
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Make a byte array with our fill character.
names = array.array("B", MAX_BYTES * FILL_CHAR)
# Get the address of our names byte array for use in our struct.
names_address, names_length = names.buffer_info()
# Create a mutable byte buffer to store the data in
mutable_byte_buffer = struct.pack("iL", MAX_BYTES, names_address)
# mutate our mutable_byte_buffer with the results of get_iface_list.
# NOTE: mutated_byte_buffer is just a reference to mutable_byte_buffer - for the sake of clarity we've defined them as
# separate variables, however they are the same address space - that's how fcntl.ioctl() works since the mutate_flag=True
# by default.
mutated_byte_buffer = fcntl.ioctl(sock.fileno(), SIOCGIFCONF, mutable_byte_buffer)
# Close socket
sock.close()
# Get our max_bytes of our mutated byte buffer that points to the names variable address space.
max_bytes_out, names_address_out = struct.unpack("iL", mutated_byte_buffer)
# Convert names to a bytes array - keep in mind we've mutated the names array, so now our bytes out should represent
# the bytes results of the get iface list ioctl command.
namestr = bytearray(names)
# Each entry is 40 bytes long. The first 16 bytes are the name string.
# the 20-24th bytes are IP address octet strings in byte form - one for each byte.
# Don't know what 17-19 are, or bytes 25:40.
ip_dict = {}
for i in range(0, max_bytes_out, 40):
name = namestr[i : i + 16].split(FILL_CHAR, 1)[0]
name = name.decode("utf-8")
ip_bytes = namestr[i + 20 : i + 24]
full_addr = []
for netaddr in ip_bytes:
if isinstance(netaddr, int):
full_addr.append(str(netaddr))
elif isinstance(netaddr, str):
full_addr.append(str(ord(netaddr)))
ip_dict[name] = ".".join(full_addr)
# Remove loopback interface is in list
if "lo" in ip_dict:
del ip_dict["lo"]
return {"hostname": hostname, "interfaces": ip_dict}


def get_key():
return str(b64encode(get_var(AUTH_RE).encode("utf-8")))
Loading

0 comments on commit fa4d5b4

Please sign in to comment.