Skip to content

Commit

Permalink
bittensor/axon.py: thread and exception handling
Browse files Browse the repository at this point in the history
Various issues were encountered trying to run and understand e2e tests:
- if uvicorn fails to start, an uncaught exception is emitted to stderr
- axon keeps spinning waiting for self.started, indefinitely
- exceptions are not propagated from threads
- there is no way to (simply) test from the outside whether an axon
  started and/or runs
- axon creates a thread that only creates another thread, which seems
  redundant

This patch addresses some of these issues, in FastAPIThreadedServer:
- add thread safe set/get_exception() to set/get exceptions
- run_in_thread() yields the created thread, so that the code using it
  can check whether the thread is alive
- uvicorn.Server.startup() is wrapped to set a thread-safe flag using
  self.set_started(True) to indicate startup succeeded
- run_in_thread() times out after one second to prevent infinite loop in
  case self.get_started() never becomes True
- run_in_thread() raises an exception if it fails to start the thread
- _wrapper_run() tests whether the thread is still alive

and in class axon, the following are added:
- @Property axon.exception(), returning any exception
- axon.is_running(), returning True when the axon is operational

The seemingly redundant thread is left in until feedback is received on
the reasons for including it.
  • Loading branch information
µ committed Aug 13, 2024
1 parent 028ef8b commit 7bb5ae9
Showing 1 changed file with 77 additions and 6 deletions.
83 changes: 77 additions & 6 deletions bittensor/axon.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import inspect
import json
import os
import socket
import threading
import time
import traceback
Expand Down Expand Up @@ -100,26 +101,72 @@ class FastAPIThreadedServer(uvicorn.Server):
should_exit: bool = False
is_running: bool = False

"""
Provide a channel to signal exceptions from the thread to our caller.
"""
_exception: Exception = None
_lock: threading.Lock = threading.Lock()
_thread: threading.Thread = None
_started: bool = False

def set_exception(self, ex):
with self._lock:
self._exception = ex

def get_exception(self):
with self._lock:
return self._exception

def set_thread(self, thread):
with self._lock:
self._thread = thread

def get_thread(self):
with self._lock:
return self._thread

def set_started(self, started):
with self._lock:
self._started = started

def get_started(self):
with self._lock:
return self._started

def install_signal_handlers(self):
"""
Overrides the default signal handlers provided by ``uvicorn.Server``. This method is essential to ensure that the signal handling in the threaded server does not interfere with the main application's flow, especially in a complex asynchronous environment like the Axon server.
"""
pass

async def startup(self, sockets: Optional[List[socket.socket]] = None) -> None:
"""
Adds a thread-safe call to set a 'started' flag on the object.
"""
ret = await super().startup(sockets)
self.set_started(True)
return ret

@contextlib.contextmanager
def run_in_thread(self):
"""
Manages the execution of the server in a separate thread, allowing the FastAPI application to run asynchronously without blocking the main thread of the Axon server. This method is a key component in enabling concurrent request handling in the Axon server.
Yields:
None: This method yields control back to the caller while the server is running in the background thread.
thread: a running thread
Raises:
Exception: in case the server did not start (as signalled by self.get_started())
"""
thread = threading.Thread(target=self.run, daemon=True)
thread.start()
try:
while not self.started:
t0 = time.time()
while not self.get_started() and time.time()-t0<1:
time.sleep(1e-3)
yield
if not self.get_started():
raise Exception("failed to start server")
yield thread
finally:
self.should_exit = True
thread.join()
Expand All @@ -128,9 +175,15 @@ def _wrapper_run(self):
"""
A wrapper method for the :func:`run_in_thread` context manager. This method is used internally by the ``start`` method to initiate the server's execution in a separate thread.
"""
with self.run_in_thread():
while not self.should_exit:
time.sleep(1e-3)
try:
with self.run_in_thread() as thread:
self.set_thread(thread)
while not self.should_exit:
if not thread.is_alive():
raise Exception("worker thread died")
time.sleep(1e-3)
except Exception as e:
self.set_exception(e)

def start(self):
"""
Expand Down Expand Up @@ -405,6 +458,24 @@ def info(self) -> "bittensor.AxonInfo":
placeholder2=0,
)

# Our instantiator should be able to test axon.exception to see if any
# exception occurred.
@property
def exception(self):
# for future use: setting self._exception to signal an exception
e = getattr(self,'_exception',None)
if e:
return e
return self.fast_server.get_exception()

# Our instantiator should be able to test axon.is_running() to see if all
# required threads etc are running.
def is_running(self):
t = self.fast_server.get_thread()
if t is None:
return False
return t.is_alive()

def attach(
self,
forward_fn: Callable,
Expand Down

0 comments on commit 7bb5ae9

Please sign in to comment.