diff --git a/taskiq/cli/worker/process_manager.py b/taskiq/cli/worker/process_manager.py index e7463a0..e723956 100644 --- a/taskiq/cli/worker/process_manager.py +++ b/taskiq/cli/worker/process_manager.py @@ -57,7 +57,7 @@ def handle( self, workers: List[Process], args: WorkerArgs, - worker_func: Callable[[WorkerArgs, EventType], None], + worker_func: Callable[[WorkerArgs], None], ) -> None: """ This action reloads a single process. @@ -79,7 +79,7 @@ def handle( event: EventType = Event() new_process = Process( target=worker_func, - kwargs={"args": args, "event": event}, + kwargs={"args": args}, name=f"worker-{self.worker_num}", daemon=True, ) @@ -152,9 +152,8 @@ class ProcessManager: def __init__( self, args: WorkerArgs, - worker_function: Callable[[WorkerArgs, EventType], None], + worker_function: Callable[[WorkerArgs], None], observer: Optional[Observer] = None, # type: ignore[valid-type] - max_restarts: Optional[int] = None, ) -> None: self.worker_function = worker_function self.action_queue: "Queue[ProcessActionBase]" = Queue(-1) @@ -183,7 +182,7 @@ def prepare_workers(self) -> None: event = Event() work_proc = Process( target=self.worker_function, - kwargs={"args": self.args, "event": event}, + kwargs={"args": self.args}, name=f"worker-{process}", daemon=True, ) diff --git a/taskiq/cli/worker/run.py b/taskiq/cli/worker/run.py index 09c71b0..b28fd5a 100644 --- a/taskiq/cli/worker/run.py +++ b/taskiq/cli/worker/run.py @@ -3,7 +3,6 @@ import signal from concurrent.futures import ThreadPoolExecutor from multiprocessing import set_start_method -from multiprocessing.synchronize import Event from sys import platform from typing import Any, Optional, Type @@ -68,7 +67,7 @@ def get_receiver_type(args: WorkerArgs) -> Type[Receiver]: return receiver_type -def start_listen(args: WorkerArgs, event: Event) -> None: +def start_listen(args: WorkerArgs) -> None: """ This function starts actual listening process. @@ -109,9 +108,6 @@ def interrupt_handler(signum: int, _frame: Any) -> None: signal.signal(signal.SIGINT, interrupt_handler) signal.signal(signal.SIGTERM, interrupt_handler) - # Notify parent process, worker is ready - event.set() - if uvloop is not None: logger.debug("UVLOOP found. Using it as async runner") loop = uvloop.new_event_loop() # type: ignore @@ -163,7 +159,7 @@ def run_worker(args: WorkerArgs) -> Optional[int]: :returns: Optional status code. """ if platform == "darwin": - set_start_method("fork") + set_start_method("spawn") if args.configure_logging: logging.basicConfig( level=logging.getLevelName(args.log_level),