Skip to content

Commit

Permalink
Refactor Daemon management and improve code structure 🛠️
Browse files Browse the repository at this point in the history
- Deleted h3daemon/TODO.txt file.
- Renamed h3daemon/manager.py to h3daemon/daemon.py.
- Updated imports in __init__.py, cli.py to reflect new module updates.
- Introduced h3daemon/daemonize.py and h3daemon/possess.py for better clarity.
- Refactored daemon management logic for improved reliability.
- Enhanced error handling with DaemonAlreadyRunningError, PIDNotFoundError.
- Introduced Polling class for better retry logic than tenacity.
- Simplified health checks by separating logic into h3daemon/healthy.py.

This comprehensive overhaul streamlines the daemon process management, enhances code maintainability, and removes legacy dependencies. By modularizing components and introducing robust polling mechanisms, the code now supports more efficient daemon controls and error handling.
  • Loading branch information
horta committed Dec 5, 2024
1 parent c7ff7ef commit 6b7f72d
Show file tree
Hide file tree
Showing 12 changed files with 199 additions and 125 deletions.
1 change: 0 additions & 1 deletion h3daemon/TODO.txt

This file was deleted.

6 changes: 6 additions & 0 deletions h3daemon/h3daemon/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from h3daemon.daemon import Daemon
from h3daemon.daemonize import spawn
from h3daemon.ensure_pressed import ensure_pressed
from h3daemon.possess import possess

__all__ = ["Daemon", "ensure_pressed", "spawn", "possess"]
61 changes: 14 additions & 47 deletions h3daemon/h3daemon/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@
from typing import Optional

import typer
from daemon import DaemonContext
from deciphon_schema import HMMFile
from typer import echo

from h3daemon.ensure_pressed import ensure_pressed
from h3daemon.manager import Manager
from h3daemon.daemon import Daemon
from h3daemon.daemonize import spawn
from h3daemon.pidfile import create_pidfile
from h3daemon.port import find_free_port

__all__ = ["app"]

Expand All @@ -30,9 +28,6 @@
O_STDIN = typer.Option(None, "--stdin")
O_STDOUT = typer.Option(None, "--stdout")
O_STDERR = typer.Option(None, "--stderr")
O_DETACH = typer.Option(
None, "--detach/--no-detach", show_default=False, help="[default: None]"
)


@app.callback(invoke_without_command=True)
Expand All @@ -50,39 +45,19 @@ def start(
stdout: Optional[Path] = O_STDOUT,
stderr: Optional[Path] = O_STDERR,
force: bool = O_FORCE,
detach: Optional[bool] = O_DETACH,
):
"""
Start daemon.
"""
hmm = HMMFile(path=hmmfile)
ensure_pressed(hmm)
pidfile = create_pidfile(hmm.path)
if pid := pidfile.is_locked():
if not force:
echo(f"Daemon for {hmmfile} is already running.")
raise typer.Exit(1)
echo("Cleaning up previous daemon...")
x = Manager.possess(pid)
x.shutdown(force=force)

cport = find_free_port() if port == 0 else port
wport = find_free_port()
fin = open(stdin, "r") if stdin else stdin
fout = open(stdout, "w+") if stdout else stdout
ferr = open(stderr, "w+") if stderr else stderr

assert pidfile.is_locked() is None
with DaemonContext(
working_directory=str(hmm.path.parent),
pidfile=pidfile,
detach_process=detach,
stdin=fin,
stdout=fout,
stderr=ferr,
):
x = Manager.spawn(hmm, cport, wport)
x.join()
spawn(
hmm,
cport=port,
stdin=stdin,
stdout=stdout,
stderr=stderr,
force=force,
)


@app.command()
Expand All @@ -92,12 +67,8 @@ def stop(hmmfile: Path, force: bool = O_FORCE):
"""
hmm = HMMFile(path=hmmfile)
pidfile = create_pidfile(hmm.path)
if pid := pidfile.is_locked():
x = Manager.possess(pid)
x.shutdown(force=force)
else:
echo("No process associated with the PID file.")
raise typer.Exit(1)
x = Daemon.possess(pidfile)
x.shutdown(force=force)


@app.command()
Expand All @@ -107,9 +78,5 @@ def ready(hmmfile: Path, wait: bool = O_WAIT):
"""
file = HMMFile(path=hmmfile)
pidfile = create_pidfile(file.path)
if pid := pidfile.is_locked():
x = Manager.possess(pid)
echo(x.healthy())
else:
echo(f"Failed to possess {hmmfile}. Have you started the daemon?")
raise typer.Exit(1)
x = Daemon.possess(pidfile)
echo(x.healthy())
52 changes: 13 additions & 39 deletions h3daemon/h3daemon/manager.py → h3daemon/h3daemon/daemon.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
import os
import time
import traceback
from contextlib import suppress

import psutil
from deciphon_schema import HMMFile
from tenacity import Retrying, stop_after_delay, wait_exponential
from pidlockfile import PIDLockFile

from h3daemon.errors import ChildNotFoundError
from h3daemon.errors import ChildNotFoundError, PIDNotFoundError
from h3daemon.healthy import assert_peers_healthy
from h3daemon.master import Master
from h3daemon.port import find_free_port
from h3daemon.worker import Worker

__all__ = ["Manager"]
__all__ = ["Daemon"]


def shutdown(x: psutil.Process, force: bool):
Expand All @@ -28,34 +28,6 @@ def shutdown(x: psutil.Process, force: bool):
shutdown(x, True)


def assert_healthy(master: Master, worker: Worker):
try:
assert master.is_running()
assert worker.is_running()
master_listen = master.local_listening_ports()
master_lport = master.local_established_ports()
master_rport = master.remote_established_ports()
worker_lport = worker.local_established_ports()
worker_rport = worker.remote_established_ports()

assert len(master_lport) == 1
assert len(worker_rport) == 1
assert master_lport[0] == worker_rport[0]
assert len(master_rport) == 1
assert len(worker_lport) == 1
assert master_rport[0] == worker_lport[0]
assert len(master_listen) == 2
master_ports = set(master_listen)
assert len(master_ports) == 2
master_ports.remove(worker_rport[0])
assert len(master_ports) == 1
except RuntimeError as exception:
if not exception.args[0] == "proc_pidinfo(PROC_PIDLISTFDS) 2/2 syscall failed":
raise exception
# psutil bug: https://github.com/giampaolo/psutil/issues/2116
time.sleep(0.1)


def debug_exception(exception: Exception):
if os.environ.get("H3DAEMON_DEBUG", 0):
with open("h3daemon_debug.txt", "a") as f:
Expand All @@ -70,7 +42,7 @@ def debug_msg(msg: str):
f.write(f"{msg}\n")


class Manager:
class Daemon:
def __init__(self, master: Master, worker: Worker, process: psutil.Process | None):
self._master = master
self._worker = worker
Expand All @@ -93,7 +65,7 @@ def spawn(cls, hmmfile: HMMFile, cport: int = 0, wport: int = 0):
worker = Worker(psutil.Popen(cmd))
worker.wait_for_readiness()

assert_healthy(master, worker)
assert_peers_healthy(master, worker)
except Exception as exception:
debug_exception(exception)
if master:
Expand All @@ -105,7 +77,10 @@ def spawn(cls, hmmfile: HMMFile, cport: int = 0, wport: int = 0):
return cls(master, worker, None)

@classmethod
def possess(cls, pid: int):
def possess(cls, pidfile: PIDLockFile):
pid = pidfile.is_locked()
if not pid:
raise PIDNotFoundError("PID not in pidfile.")
process = psutil.Process(pid)
children = process.children()

Expand Down Expand Up @@ -133,16 +108,15 @@ def shutdown(self, force=False):

def healthy(self) -> bool:
try:
assert_healthy(self._master, self._worker)
if self._process:
assert self._process.is_running()
assert_peers_healthy(self._master, self._worker)
except Exception as exception:
debug_exception(exception)
return False
return True

def port(self) -> int:
for attempt in Retrying(stop=stop_after_delay(10), wait=wait_exponential()):
with attempt:
assert self.healthy
master_ports = set(self._master.local_listening_ports())
master_ports.remove(self._worker.remote_established_ports()[0])
return int(list(master_ports)[0])
Expand Down
65 changes: 65 additions & 0 deletions h3daemon/h3daemon/daemonize.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from multiprocessing import Process
from typing import Any, Optional

from daemon import DaemonContext
from deciphon_schema import HMMFile

from h3daemon.daemon import Daemon
from h3daemon.ensure_pressed import ensure_pressed
from h3daemon.errors import DaemonAlreadyRunningError
from h3daemon.pidfile import create_pidfile

__all__ = ["daemonize", "spawn"]


def daemonize(
hmmfile: HMMFile,
cport: int = 0,
wport: int = 0,
stdin: Optional[Any] = None,
stdout: Optional[Any] = None,
stderr: Optional[Any] = None,
detach: Optional[bool] = None,
):
ensure_pressed(hmmfile)
fin = open(stdin, "r") if stdin else stdin
fout = open(stdout, "w+") if stdout else stdout
ferr = open(stderr, "w+") if stderr else stderr

pidfile = create_pidfile(hmmfile.path)
assert pidfile.is_locked() is None
with DaemonContext(
working_directory=str(hmmfile.path.parent),
pidfile=pidfile,
detach_process=True,
stdin=fin,
stdout=fout,
stderr=ferr,
):
x = Daemon.spawn(hmmfile, cport, wport)
x.join()

return pidfile


def spawn(
hmmfile: HMMFile,
cport: int = 0,
wport: int = 0,
stdin: Optional[Any] = None,
stdout: Optional[Any] = None,
stderr: Optional[Any] = None,
force: Optional[bool] = False,
):
pidfile = create_pidfile(hmmfile.path)
if pidfile.is_locked():
if not force:
raise DaemonAlreadyRunningError(f"Daemon for {hmmfile} is already running.")
x = Daemon.possess(pidfile)
x.shutdown(force=force)

args = (hmmfile, cport, wport, stdin, stdout, stderr)
p = Process(target=daemonize, args=args, daemon=True)
p.start()
p.join()
return pidfile
12 changes: 11 additions & 1 deletion h3daemon/h3daemon/errors.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
__all__ = ["ChildNotFoundError"]
__all__ = ["ChildNotFoundError", "DaemonAlreadyRunningError", "PIDNotFoundError"]


class ChildNotFoundError(RuntimeError):
def __str__(self):
return repr(self)


class DaemonAlreadyRunningError(RuntimeError):
def __str__(self):
return repr(self)


class PIDNotFoundError(RuntimeError):
def __str__(self):
return repr(self)
24 changes: 24 additions & 0 deletions h3daemon/h3daemon/healthy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from h3daemon.master import Master
from h3daemon.worker import Worker


def assert_peers_healthy(master: Master, worker: Worker):
assert master.healthy()
assert worker.healthy()
master_listen = master.local_listening_ports()
master_lport = master.local_established_ports()
master_rport = master.remote_established_ports()
worker_lport = worker.local_established_ports()
worker_rport = worker.remote_established_ports()

assert len(master_lport) == 1
assert len(worker_rport) == 1
assert master_lport[0] == worker_rport[0]
assert len(master_rport) == 1
assert len(worker_lport) == 1
assert master_rport[0] == worker_lport[0]
assert len(master_listen) == 2
master_ports = set(master_listen)
assert len(master_ports) == 2
master_ports.remove(worker_rport[0])
assert len(master_ports) == 1
30 changes: 11 additions & 19 deletions h3daemon/h3daemon/master.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from __future__ import annotations

import time

import hmmer
import psutil
from tenacity import Retrying, stop_after_delay, wait_exponential

from h3daemon.polling import Polling
from h3daemon.tcp import tcp_connections

__all__ = ["Master"]

Expand All @@ -23,36 +23,28 @@ def cmd(cport: int, wport: int, hmmfile: str):
cmd += ["--cport", str(cport), "--wport", str(wport)]
return cmd

def is_ready(self):
if not self.is_running():
def healthy(self):
if not self._proc.is_running():
return False
try:
lports = self.local_listening_ports()
except RuntimeError:
# psutil bug: https://github.com/giampaolo/psutil/issues/2116
time.sleep(1)
lports = [-1, -1]
lports = self.local_listening_ports()
return len(lports) > 1

def wait_for_readiness(self):
for attempt in Retrying(stop=stop_after_delay(10), wait=wait_exponential()):
for attempt in Polling():
with attempt:
assert self.is_ready()

def is_running(self):
return self._proc.is_running()
assert self.healthy()

def local_listening_ports(self):
connections = self._proc.net_connections(kind="tcp")
connections = tcp_connections(self._proc)
connections = [x for x in connections if x.status == "LISTEN"]
return [x.laddr.port for x in connections if x.laddr.ip == "0.0.0.0"]

def local_established_ports(self):
connections = self._proc.net_connections(kind="tcp")
connections = tcp_connections(self._proc)
connections = [x for x in connections if x.status == "ESTABLISHED"]
return [x.laddr.port for x in connections if x.laddr.ip == "127.0.0.1"]

def remote_established_ports(self):
connections = self._proc.net_connections(kind="tcp")
connections = tcp_connections(self._proc)
connections = [x for x in connections if x.status == "ESTABLISHED"]
return [x.raddr.port for x in connections if x.raddr.ip == "127.0.0.1"]
Loading

0 comments on commit 6b7f72d

Please sign in to comment.