Skip to content

Commit

Permalink
refactor: Enhance daemon functionality and options 🚀
Browse files Browse the repository at this point in the history
- Added `--wait` option to the `ready` command in `cli.py`
- Refined PID file creation logic in `pidfile.py`
- Introduced `portfile.py` for managing communication ports
- Restructured `Daemon` and `Worker` classes to use ports
- Improved health verification logic in `Master` and `Worker`
- Removed deprecated `assert_peers_healthy` functionality
- Updated dependencies in `pyproject.toml` to latest versions

Files changed:
- `cli.py`
- `daemon.py`
- `daemonize.py`
- `ensure_pressed.py`
- `healthy.py` (deleted)
- `master.py`
- `pidfile.py`
- `portfile.py` (new)
- `possess.py`
- `tcp.py`
- `worker.py`
- `pyproject.toml`
- `test_daemon.py`
  • Loading branch information
horta committed Jan 22, 2025
1 parent 227c585 commit cbe9228
Show file tree
Hide file tree
Showing 13 changed files with 97 additions and 142 deletions.
13 changes: 9 additions & 4 deletions h3daemon/h3daemon/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from h3daemon.daemon import Daemon
from h3daemon.daemonize import spawn
from h3daemon.pidfile import create_pidfile
from h3daemon import possess

__all__ = ["app"]

Expand All @@ -27,6 +28,7 @@
O_STDIN = typer.Option(None, "--stdin")
O_STDOUT = typer.Option(None, "--stdout")
O_STDERR = typer.Option(None, "--stderr")
O_WAIT = typer.Option(False, "--wait")


@app.callback(invoke_without_command=True)
Expand Down Expand Up @@ -69,17 +71,20 @@ def stop(hmmfile: Path, force: bool = O_FORCE):
Stop daemon.
"""
hmm = HMMFile(path=hmmfile)
pidfile = create_pidfile(hmm.path)
pidfile = create_pidfile(hmm)
x = Daemon.possess(pidfile)
x.shutdown(force=force)


@app.command()
def ready(hmmfile: Path):
def ready(
hmmfile: Path,
wait: bool = O_WAIT,
):
"""
Check if h3daemon is running and ready.
"""
file = HMMFile(path=hmmfile)
pidfile = create_pidfile(file.path)
x = Daemon.possess(pidfile)
pidfile = create_pidfile(file)
x = possess(pidfile, wait=wait)
echo(x.healthy())
44 changes: 15 additions & 29 deletions h3daemon/h3daemon/daemon.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import platform
from contextlib import contextmanager, suppress

import psutil
Expand All @@ -9,9 +8,8 @@
from h3daemon.debug import debug_exception, debug_message
from h3daemon.ensure_pressed import ensure_pressed
from h3daemon.errors import ChildNotFoundError, PIDNotFoundError
from h3daemon.healthy import assert_peers_healthy
from h3daemon.master import Master
from h3daemon.port import find_free_port
from h3daemon.portfile import read_portfile
from h3daemon.worker import Worker

__all__ = ["Daemon", "daemon_context"]
Expand Down Expand Up @@ -41,26 +39,23 @@ def __init__(self, master: Master, worker: Worker, process: psutil.Process | Non
debug_message(f"Daemon.__init__ PID: {process.pid}")

@classmethod
def spawn(cls, hmmfile: HMMFile, cport: int = 0, wport: int = 0):
def create(cls, hmmfile: HMMFile, cport: int, wport: int):
ensure_pressed(hmmfile)
master: Master | None = None
worker: Worker | None = None

try:
cport = find_free_port() if cport == 0 else cport
wport = find_free_port() if wport == 0 else wport
debug_message(f"Daemon.spawn cport: {cport}")
debug_message(f"Daemon.spawn wport: {wport}")

cmd = Master.cmd(cport, wport, str(hmmfile.path))
master = Master(psutil.Popen(cmd))
master = Master(psutil.Popen(cmd), cport, wport)
master.wait_for_readiness()

cmd = Worker.cmd(wport)
worker = Worker(psutil.Popen(cmd))
worker = Worker(psutil.Popen(cmd), wport)
worker.wait_for_readiness()

if platform.system() != "Darwin":
assert_peers_healthy(master, worker)
debug_message("Daemon.spawn is ready")
except Exception as exception:
debug_exception(exception)
Expand All @@ -74,6 +69,10 @@ def spawn(cls, hmmfile: HMMFile, cport: int = 0, wport: int = 0):

@classmethod
@retry(stop=stop_after_delay(10), wait=wait_exponential(multiplier=0.001))
def possess_wait(cls, pidfile: PIDLockFile):
return Daemon.possess(pidfile)

@classmethod
def possess(cls, pidfile: PIDLockFile):
pid = pidfile.is_locked()
if not pid:
Expand All @@ -85,15 +84,17 @@ def possess(cls, pidfile: PIDLockFile):
masters = [x for x in children if Master.myself(x)]
workers = [x for x in children if Worker.myself(x)]

cport, wport = read_portfile(pidfile)

if len(masters) > 0:
assert len(masters) == 1
master = Master(masters[0])
master = Master(masters[0], cport, wport)
else:
raise ChildNotFoundError("Master not found.")

if len(workers) > 0:
assert len(workers) == 1
worker = Worker(workers[0])
worker = Worker(workers[0], wport)
else:
raise ChildNotFoundError("Worker not found.")

Expand Down Expand Up @@ -122,37 +123,22 @@ def healthy(self) -> bool:
assert self._process.is_running()
assert self._master.healthy()
assert self._worker.healthy()
assert_peers_healthy(self._master, self._worker)
except Exception as exception:
debug_exception(exception)
return False
return True

def port(self) -> int:
self.wait_for_readiness()
master_ports = set(self._master.local_listening_ports())
worker_ports = list(set(self._worker.remote_established_ports()))
debug_message(f"Daemon.port master_ports: {master_ports}")
debug_message(f"Daemon.port worker_ports: {worker_ports}")
if len(worker_ports) != 1:
raise RuntimeError(
f"Expected one remote port ({worker_ports}). Worker might have died."
)
master_ports.remove(worker_ports[0])
if len(master_ports) != 1:
raise RuntimeError(
f"Expected one remaining master port ({master_ports}). Master might have died."
)
return int(list(master_ports)[0])
return self._master.cport()

def join(self):
psutil.wait_procs([self._master.process, self._worker.process])


@contextmanager
def daemon_context(hmmfile: HMMFile, cport: int = 0, wport: int = 0):
ensure_pressed(hmmfile)
x = Daemon.spawn(hmmfile, cport, wport)
x = Daemon.create(hmmfile, cport, wport)
try:
x.wait_for_readiness()
yield x
Expand Down
15 changes: 8 additions & 7 deletions h3daemon/h3daemon/daemonize.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
from deciphon_schema import HMMFile

from h3daemon.daemon import Daemon
from h3daemon.ensure_pressed import ensure_pressed
from h3daemon.errors import DaemonAlreadyRunningError
from h3daemon.pidfile import create_pidfile
from h3daemon.port import find_free_port
from h3daemon.portfile import create_portfile

__all__ = ["daemonize", "spawn"]

Expand All @@ -21,7 +22,7 @@ def daemonize(
stderr: Optional[Any] = None,
detach: Optional[bool] = None,
):
pidfile = create_pidfile(hmmfile.path)
pidfile = create_pidfile(hmmfile)
assert pidfile.is_locked() is None
with DaemonContext(
working_directory=str(hmmfile.path.parent),
Expand All @@ -31,11 +32,12 @@ def daemonize(
stdout=stdout,
stderr=stderr,
):
x = Daemon.spawn(hmmfile, cport, wport)
cport = find_free_port() if cport == 0 else cport
wport = find_free_port() if wport == 0 else wport
create_portfile(pidfile, cport, wport)
x = Daemon.create(hmmfile, cport, wport)
x.join()

return pidfile


def spawn(
hmmfile: HMMFile,
Expand All @@ -47,8 +49,7 @@ def spawn(
detach: Optional[bool] = None,
force: Optional[bool] = False,
):
ensure_pressed(hmmfile)
pidfile = create_pidfile(hmmfile.path)
pidfile = create_pidfile(hmmfile)
if pidfile.is_locked():
if not force:
raise DaemonAlreadyRunningError(f"Daemon for {hmmfile} is already running.")
Expand Down
8 changes: 6 additions & 2 deletions h3daemon/h3daemon/ensure_pressed.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from pathlib import Path
from subprocess import check_call
from subprocess import DEVNULL, check_call

import hmmer
from deciphon_schema import HMMFile
Expand All @@ -15,4 +15,8 @@ def ensure_pressed(hmmfile: HMMFile):
if not filename.exists():
for x in pressed_extensions:
Path(f"{hmmfile.path}.{x}").unlink(True)
check_call([hmmer.path(hmmer.hmmpress), str(hmmfile.path)])
check_call(
[hmmer.path(hmmer.hmmpress), str(hmmfile.path)],
stdout=DEVNULL,
stderr=DEVNULL,
)
24 changes: 0 additions & 24 deletions h3daemon/h3daemon/healthy.py

This file was deleted.

43 changes: 13 additions & 30 deletions h3daemon/h3daemon/master.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,22 @@
import psutil
from tenacity import retry, stop_after_delay, wait_exponential

from h3daemon.debug import debug_exception, debug_message
from h3daemon.tcp import tcp_connections
from h3daemon.debug import debug_message
from h3daemon.tcp import can_connect

__all__ = ["Master"]


class Master:
def __init__(self, process: psutil.Process):
def __init__(self, process: psutil.Process, cport: int, wport: int):
self._proc = process
self._cport = cport
self._wport = wport
debug_message(f"Master.__init__ PID: {process.pid}")

def cport(self):
return self._cport

@staticmethod
def myself(process: psutil.Process):
return "--master" in process.cmdline()
Expand All @@ -30,34 +35,12 @@ def cmd(cport: int, wport: int, hmmfile: str):
return cmd

def healthy(self):
if not self._proc.is_running():
return False
try:
lports = self.local_listening_ports()
debug_message(f"Master.healthy lports: {lports}")
except Exception as exception:
debug_exception(exception)
return False
return len(lports) > 1
return (
self._proc.is_running()
and can_connect("127.0.0.1", self._cport)
and can_connect("127.0.0.1", self._wport)
)

@retry(stop=stop_after_delay(10), wait=wait_exponential(multiplier=0.001))
def wait_for_readiness(self):
assert self.healthy()

def local_listening_ports(self):
connections = tcp_connections(self._proc)
debug_message(f"Master.local_listening_ports connections: {connections}")
connections = [x for x in connections if x.status == "LISTEN"]
return [x.laddr.port for x in connections if x.laddr.ip == "0.0.0.0"]

def local_established_ports(self):
connections = tcp_connections(self._proc)
debug_message(f"Master.local_established_ports connections: {connections}")
connections = [x for x in connections if x.status == "ESTABLISHED"]
return [x.laddr.port for x in connections if x.laddr.ip == "127.0.0.1"]

def remote_established_ports(self):
connections = tcp_connections(self._proc)
debug_message(f"Master.remote_established_ports connections: {connections}")
connections = [x for x in connections if x.status == "ESTABLISHED"]
return [x.raddr.port for x in connections if x.raddr.ip == "127.0.0.1"]
7 changes: 3 additions & 4 deletions h3daemon/h3daemon/pidfile.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
from pathlib import Path

from deciphon_schema import HMMFile
from pidlockfile import PIDLockFile

__all__ = ["create_pidfile"]


def create_pidfile(file: Path, timeout=5):
return PIDLockFile(f"{str(file.absolute())}.pid", timeout=timeout)
def create_pidfile(hmmfile: HMMFile, timeout=5):
return PIDLockFile(f"{str(hmmfile.path.absolute())}.pid", timeout=timeout)
23 changes: 23 additions & 0 deletions h3daemon/h3daemon/portfile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from pathlib import Path

from pidlockfile import PIDLockFile

__all__ = ["create_portfile", "read_portfile"]


def create_portfile(pidfile: PIDLockFile, cport: int, wport: int):
portfile = _portfile(pidfile)
with open(portfile, "w") as f:
f.write(f"{cport} {wport}\n")
return portfile


def read_portfile(pidfile: PIDLockFile):
portfile = _portfile(pidfile)
with open(portfile, "r") as f:
cport, wport = f.readline().strip().split(" ")
return int(cport), int(wport)


def _portfile(pidfile: PIDLockFile):
return f"{str(Path(pidfile.path).absolute())}.port"
4 changes: 3 additions & 1 deletion h3daemon/h3daemon/possess.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,7 @@
__all__ = ["possess"]


def possess(pidfile: PIDLockFile):
def possess(pidfile: PIDLockFile, wait=True):
if wait:
return Daemon.possess_wait(pidfile)
return Daemon.possess(pidfile)
16 changes: 9 additions & 7 deletions h3daemon/h3daemon/tcp.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from psutil import Process
import socket

__all__ = ["tcp_connections"]
__all__ = ["can_connect"]


def tcp_connections(x: Process):
# psutil bug: https://github.com/giampaolo/psutil/issues/2116
with open("/dev/null", "wb"):
connections = x.net_connections(kind="tcp")
return connections
def can_connect(host: str, port: int):
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((host, port))
return True
except ConnectionRefusedError:
return False
Loading

0 comments on commit cbe9228

Please sign in to comment.